diff --git a/.gitignore b/.gitignore index 803e2c47..61e0932b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,9 +1,10 @@ -.DS_Store -.idea/ -test_log/ -backup/ - -polaris/log/* - -angevil_test/ -test/other/other_test_suit.go +.DS_Store +.idea/ +test_log/ +backup/ + +polaris/log/* + +angevil_test/ +test/other/other_test_suit.go +*.log diff --git a/pkg/config/api.go b/pkg/config/api.go index 6bdebd7d..d8067242 100644 --- a/pkg/config/api.go +++ b/pkg/config/api.go @@ -76,17 +76,6 @@ type RateLimitConfig interface { IsEnable() bool //设置是否启用限流能力 SetEnable(bool) - //返回限流行为使用的插件 - //GetBehaviorPlugin(behavior RateLimitBehavior) string - //设置限流行为使用的插件 - //SetBehaviorPlugin(behavior RateLimitBehavior, p string) - SetMode(string) - - GetMode() model.ConfigMode - - SetRateLimitCluster(namespace string, service string) - - GetRateLimitCluster() ServerClusterConfig //获取最大限流窗口数量 GetMaxWindowSize() int //设置最大限流窗口数量 @@ -95,6 +84,10 @@ type RateLimitConfig interface { GetPurgeInterval() time.Duration //设置超时淘汰周期 SetPurgeInterval(time.Duration) + // GetRules + GetRules() []RateLimitRule + // SetRules + SetRules([]RateLimitRule) } //系统配置信息 diff --git a/pkg/config/circuitbreaker.go b/pkg/config/circuitbreaker.go index 91e8f53a..1e557fe2 100644 --- a/pkg/config/circuitbreaker.go +++ b/pkg/config/circuitbreaker.go @@ -33,7 +33,7 @@ type CircuitBreakerConfigImpl struct { //熔断器定时检查周期 CheckPeriod *time.Duration `yaml:"checkPeriod" json:"checkPeriod"` //熔断插件链 - Chain []string `yaml:"chain"` + Chain []string `yaml:"chain" json:"chain"` //熔断周期,被熔断后多久可以变为半开 SleepWindow *time.Duration `yaml:"sleepWindow" json:"sleepWindow"` //半开状态后最多分配多少个探测请求 diff --git a/pkg/config/ratelimiter.go b/pkg/config/ratelimiter.go index 8154e4e7..4fdc58e8 100644 --- a/pkg/config/ratelimiter.go +++ b/pkg/config/ratelimiter.go @@ -20,9 +20,8 @@ package config import ( "errors" "fmt" - "github.com/polarismesh/polaris-go/pkg/model" "github.com/polarismesh/polaris-go/pkg/plugin/common" - "strconv" + "strings" "time" ) @@ -32,14 +31,59 @@ type RateLimitConfigImpl struct { Enable *bool `yaml:"enable" json:"enable"` //各个限流插件的配置 Plugin PluginConfigs `yaml:"plugin" json:"plugin"` - // mode 0: local 1: global - Mode string `yaml:"mode" json:"mode"` - // rateLimitCluster - RateLimitCluster *ServerClusterConfigImpl `yaml:"rateLimitCluster" json:"rateLimitCluster"` //最大限流窗口数量 MaxWindowSize int `yaml:"maxWindowSize" json:"maxWindowSize"` //超时window检查周期 PurgeInterval time.Duration `yaml:"purgeInterval" json:"purgeInterval"` + //本地限流规则 + Rules []RateLimitRule `yaml:"rules"` +} + +// RateLimitRule 限流规则 +type RateLimitRule struct { + Namespace string `yaml:"namespace"` + Service string `yaml:"service"` + Labels map[string]Matcher `yaml:"labels"` + MaxAmount int `yaml:"maxAmount"` + ValidDuration time.Duration `yaml:"validDuration"` +} + +// Verify 校验限流规则 +func (r *RateLimitRule) Verify() error { + if len(r.Namespace) == 0 { + return errors.New("namespace is empty") + } + if len(r.Service) == 0 { + return errors.New("service is empty") + } + if len(r.Labels) > 0 { + for _, matcher := range r.Labels { + if len(matcher.Type) > 0 { + upperType := strings.ToUpper(matcher.Type) + if upperType != TypeExact && upperType != TypeRegex { + return errors.New(fmt.Sprintf("matcher.type should be %s or %s", TypeExact, TypeRegex)) + } + } + } + } + if r.ValidDuration < time.Second { + return errors.New("validDuration must greater than or equals to 1s") + } + if r.MaxAmount < 0 { + return errors.New("maxAmount must greater than or equals to 0") + } + return nil +} + +const ( + TypeExact = "EXACT" + TypeRegex = "REGEX" +) + +// Matcher 标签匹配类型 +type Matcher struct { + Type string `yaml:"type"` + Value string `yaml:"value"` } //是否启用限流能力 @@ -63,10 +107,11 @@ func (r *RateLimitConfigImpl) Verify() error { if nil == r.Enable { return fmt.Errorf("provider.rateLimit.enable must not be nil") } - if r.RateLimitCluster != nil { - if r.RateLimitCluster.GetNamespace() == ServerNamespace && - r.RateLimitCluster.GetService() == ForbidServerMetricService { - return errors.New("RateLimitCluster can not set to polaris.metric") + if len(r.Rules) > 0 { + for _, rule := range r.Rules { + if err := rule.Verify(); nil != err { + return err + } } } return r.Plugin.Verify() @@ -93,7 +138,6 @@ func (r *RateLimitConfigImpl) SetDefault() { r.PurgeInterval = DefaultRateLimitPurgeInterval } r.Plugin.SetDefault(common.TypeRateLimiter) - r.RateLimitCluster.SetDefault() } //设置插件配置 @@ -101,41 +145,11 @@ func (r *RateLimitConfigImpl) SetPluginConfig(pluginName string, value BaseConfi return r.Plugin.SetPluginConfig(common.TypeRateLimiter, pluginName, value) } -func (r *RateLimitConfigImpl) SetMode(mode string) { - r.Mode = mode -} - -func (r *RateLimitConfigImpl) GetMode() model.ConfigMode { - if r.Mode == model.RateLimitLocal { - return model.ConfigQuotaLocalMode - } else if r.Mode == model.RateLimitGlobal { - return model.ConfigQuotaGlobalMode - } else { - return model.ConfigQuotaLocalMode - } -} - -func (r *RateLimitConfigImpl) SetRateLimitCluster(namespace string, service string) { - if r.RateLimitCluster == nil { - r.RateLimitCluster = &ServerClusterConfigImpl{} - } - r.RateLimitCluster.SetNamespace(namespace) - r.RateLimitCluster.SetService(service) -} - -func (r *RateLimitConfigImpl) GetRateLimitCluster() ServerClusterConfig { - return r.RateLimitCluster -} - //配置初始化 func (r *RateLimitConfigImpl) Init() { + r.Rules = []RateLimitRule{} r.Plugin = PluginConfigs{} r.Plugin.Init(common.TypeRateLimiter) - r.Mode = strconv.Itoa(int(model.ConfigQuotaGlobalMode)) - r.RateLimitCluster = &ServerClusterConfigImpl{ - Namespace: "", - Service: "", - } } //GetMaxWindowSize @@ -157,3 +171,13 @@ func (r *RateLimitConfigImpl) GetPurgeInterval() time.Duration { func (r *RateLimitConfigImpl) SetPurgeInterval(v time.Duration) { r.PurgeInterval = v } + +// GetRules +func (r *RateLimitConfigImpl) GetRules() []RateLimitRule { + return r.Rules +} + +// GetRules +func (r *RateLimitConfigImpl) SetRules(rules []RateLimitRule) { + r.Rules = rules +} diff --git a/pkg/flow/quota/assist.go b/pkg/flow/quota/assist.go index 8839b7f8..467e9b96 100644 --- a/pkg/flow/quota/assist.go +++ b/pkg/flow/quota/assist.go @@ -18,18 +18,25 @@ package quota import ( + "errors" + "fmt" + "github.com/golang/protobuf/ptypes/duration" + "github.com/golang/protobuf/ptypes/wrappers" + "github.com/google/uuid" + "github.com/modern-go/reflect2" "github.com/polarismesh/polaris-go/pkg/clock" "github.com/polarismesh/polaris-go/pkg/config" "github.com/polarismesh/polaris-go/pkg/flow/data" "github.com/polarismesh/polaris-go/pkg/log" "github.com/polarismesh/polaris-go/pkg/model" + "github.com/polarismesh/polaris-go/pkg/model/pb" namingpb "github.com/polarismesh/polaris-go/pkg/model/pb/v1" "github.com/polarismesh/polaris-go/pkg/plugin" "github.com/polarismesh/polaris-go/pkg/plugin/common" - "github.com/polarismesh/polaris-go/pkg/plugin/serverconnector" - "github.com/modern-go/reflect2" + "strings" "sync" "sync/atomic" + "time" ) const ( @@ -52,17 +59,21 @@ type FlowQuotaAssistant struct { //服务到windowSet的映射 svcToWindowSet *sync.Map //限流server连接器 - asyncRateLimitConnector serverconnector.AsyncRateLimitConnector + asyncRateLimitConnector AsyncRateLimitConnector //任务列表 taskValues model.TaskValues - //通过配置获取的远程集群标识 - remoteClusterByConfig config.ServerClusterConfig //用来控制最大窗口数量的配置项 windowCount int32 maxWindowSize int32 windowCountLogCtrl uint64 //超时淘汰周期 purgeIntervalMilli int64 + //本地配置的限流规则 + localRules map[model.ServiceKey]model.ServiceRule +} + +func (f *FlowQuotaAssistant) AsyncRateLimitConnector() AsyncRateLimitConnector { + return f.asyncRateLimitConnector } func (f *FlowQuotaAssistant) Destroy() { @@ -95,16 +106,12 @@ func (f *FlowQuotaAssistant) TaskValues() model.TaskValues { func (f *FlowQuotaAssistant) Init(engine model.Engine, cfg config.Configuration, supplier plugin.Supplier) error { f.engine = engine f.supplier = supplier - connector, err := data.GetServerConnector(cfg, supplier) - if nil != err { - return err - } - f.asyncRateLimitConnector = connector.GetAsyncRateLimitConnector() + f.asyncRateLimitConnector = NewAsyncRateLimitConnector(engine.GetContext(), cfg) f.enable = cfg.GetProvider().GetRateLimit().IsEnable() if !f.enable { return nil } - callback, err := NewRemoteQuotaCallback(cfg, supplier, engine) + callback, err := NewRemoteQuotaCallback(cfg, supplier, engine, f.asyncRateLimitConnector) if nil != err { return err } @@ -124,13 +131,77 @@ func (f *FlowQuotaAssistant) Init(engine model.Engine, cfg config.Configuration, f.windowCount = 0 f.maxWindowSize = int32(cfg.GetProvider().GetRateLimit().GetMaxWindowSize()) f.windowCountLogCtrl = 0 - f.remoteClusterByConfig = cfg.GetProvider().GetRateLimit().GetRateLimitCluster() f.purgeIntervalMilli = model.ToMilliSeconds(cfg.GetProvider().GetRateLimit().GetPurgeInterval()) f.mutex = &sync.Mutex{} f.svcToWindowSet = &sync.Map{} + localRules := cfg.GetProvider().GetRateLimit().GetRules() + if len(localRules) > 0 { + if f.localRules, err = rateLimitRuleConversion(localRules); nil != err { + return err + } + } return nil } +func rateLimitRuleConversion(rules []config.RateLimitRule) (map[model.ServiceKey]model.ServiceRule, error) { + svcRules := make(map[model.ServiceKey]*namingpb.RateLimit) + for _, rule := range rules { + svcKey := model.ServiceKey{ + Namespace: rule.Namespace, + Service: rule.Service, + } + totalRule, ok := svcRules[svcKey] + if !ok { + totalRule = &namingpb.RateLimit{} + svcRules[svcKey] = totalRule + } + namingRule := &namingpb.Rule{ + Id: &wrappers.StringValue{Value: uuid.New().String()}, + Service: &wrappers.StringValue{Value: rule.Service}, + Namespace: &wrappers.StringValue{Value: rule.Namespace}, + Resource: namingpb.Rule_QPS, + Type: namingpb.Rule_LOCAL, + Amounts: []*namingpb.Amount{{ + MaxAmount: &wrappers.UInt32Value{Value: uint32(rule.MaxAmount)}, + ValidDuration: &duration.Duration{Seconds: int64(rule.ValidDuration / time.Second)}, + }}, + Action: &wrappers.StringValue{Value: config.DefaultRejectRateLimiter}, + } + if len(rule.Labels) > 0 { + namingRule.Labels = make(map[string]*namingpb.MatchString) + for key, matcher := range rule.Labels { + var matcherType = namingpb.MatchString_EXACT + if strings.ToUpper(matcher.Type) == + namingpb.MatchString_MatchStringType_name[int32(namingpb.MatchString_REGEX)] { + matcherType = namingpb.MatchString_REGEX + } + namingRule.Labels[key] = &namingpb.MatchString{ + Type: matcherType, + Value: &wrappers.StringValue{Value: matcher.Value}, + } + } + } + totalRule.Rules = append(totalRule.Rules, namingRule) + } + values := make(map[model.ServiceKey]model.ServiceRule, len(svcRules)) + for svcKey, totalRule := range svcRules { + respInProto := &namingpb.DiscoverResponse{ + Type: namingpb.DiscoverResponse_RATE_LIMIT, + RateLimit: totalRule, + Service: &namingpb.Service{ + Name: &wrappers.StringValue{Value: svcKey.Service}, + Namespace: &wrappers.StringValue{Value: svcKey.Namespace}, + }, + } + svcRule := pb.NewServiceRuleInProto(respInProto) + if err := svcRule.ValidateAndBuildCache(); nil != err { + return nil, errors.New(fmt.Sprintf("fail to validate config local rule, err is %v", err)) + } + values[svcKey] = svcRule + } + return values, nil +} + //获取配额分配窗口集合 func (f *FlowQuotaAssistant) DeleteRateLimitWindowSet(svcKey model.ServiceKey) { f.mutex.Lock() @@ -259,13 +330,31 @@ func (f *FlowQuotaAssistant) lookupRateLimitWindow( var err error // 1. 并发获取被调服务信息和限流配置,服务不存在,返回错误 if err = f.engine.SyncGetResources(commonRequest); nil != err { - return nil, err + sdkErr, ok := err.(model.SDKError) + if !ok { + return nil, err + } + if sdkErr.ErrorCode() != model.ErrCodeServiceNotFound { + return nil, err + } } // 2. 寻找匹配的规则 - rule, err := lookupRule(commonRequest.RateLimitRule, commonRequest.Labels) + var svcRule model.ServiceRule + svcRule = commonRequest.RateLimitRule + var rule *namingpb.Rule + var hasContent bool + hasContent, rule, err = lookupRule(svcRule, commonRequest.Labels) if nil != err { return nil, err } + if !hasContent { + //远程规则没有内容,则匹配本地规则 + svcRule = f.localRules[commonRequest.DstService] + _, rule, err = lookupRule(svcRule, commonRequest.Labels) + if nil != err { + return nil, err + } + } if nil == rule { return nil, nil } @@ -295,20 +384,24 @@ func (f *FlowQuotaAssistant) lookupRateLimitWindow( } // 寻址规则 -func lookupRule(svcRule model.ServiceRule, labels map[string]string) (*namingpb.Rule, error) { +func lookupRule(svcRule model.ServiceRule, labels map[string]string) (bool, *namingpb.Rule, error) { + if reflect2.IsNil(svcRule) { + // 规则集为空 + return false, nil, nil + } if reflect2.IsNil(svcRule.GetValue()) { // 没有配置限流规则 - return nil, nil + return false, nil, nil } validateErr := svcRule.GetValidateError() if nil != validateErr { - return nil, model.NewSDKError(model.ErrCodeInvalidRule, validateErr, + return true, nil, model.NewSDKError(model.ErrCodeInvalidRule, validateErr, "invalid rateLimit rule, please check rule for (namespace=%s, service=%s)", svcRule.GetNamespace(), svcRule.GetService()) } ruleCache := svcRule.GetRuleCache() rateLimiting := svcRule.GetValue().(*namingpb.RateLimit) - return matchRuleByLabels(labels, rateLimiting, ruleCache), nil + return true, matchRuleByLabels(labels, rateLimiting, ruleCache), nil } //通过业务标签来匹配规则 diff --git a/plugin/serverconnector/grpc/operation_rlimit_async.go b/pkg/flow/quota/remote.go similarity index 87% rename from plugin/serverconnector/grpc/operation_rlimit_async.go rename to pkg/flow/quota/remote.go index de2b2bde..63e7ae74 100644 --- a/plugin/serverconnector/grpc/operation_rlimit_async.go +++ b/pkg/flow/quota/remote.go @@ -1,5 +1,5 @@ /** - * Tencent is pleased to support the open source community by making polaris-go available. + * Tencent is pleased to support the open source community by making Polaris available. * * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. * @@ -15,7 +15,7 @@ * specific language governing permissions and limitations under the License. */ -package grpc +package quota import ( "context" @@ -26,23 +26,53 @@ import ( "github.com/polarismesh/polaris-go/pkg/log" "github.com/polarismesh/polaris-go/pkg/model" rlimitV2 "github.com/polarismesh/polaris-go/pkg/model/pb/metric/v2" - "github.com/polarismesh/polaris-go/pkg/network" - "github.com/polarismesh/polaris-go/pkg/plugin/serverconnector" - connector "github.com/polarismesh/polaris-go/plugin/serverconnector/common" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" "io" + "net" + "strings" "sync" "sync/atomic" "time" ) +//应答回调函数 +type ResponseCallBack interface { + //应答回调函数 + OnInitResponse(counter *rlimitV2.QuotaCounter, duration time.Duration, curTimeMilli int64) + //应答回调函数 + OnReportResponse(counter *rlimitV2.QuotaLeft, duration time.Duration, curTimeMilli int64) +} + +//限流消息同步器 +type RateLimitMsgSender interface { + //是否已经初始化 + HasInitialized(svcKey model.ServiceKey, labels string) bool + //发送初始化请求 + SendInitRequest(request *rlimitV2.RateLimitInitRequest, callback ResponseCallBack) + //发送上报请求 + SendReportRequest(request *rlimitV2.ClientRateLimitReportRequest) error + //同步时间 + AdjustTime() int64 +} + +//异步限流连接器 +type AsyncRateLimitConnector interface { + //初始化限流控制信息 + GetMessageSender(svcKey model.ServiceKey, hashValue uint64) (RateLimitMsgSender, error) + //销毁 + Destroy() + //流数量 + StreamCount() int +} + //头信息带给server真实的IP地址 const headerKeyClientIP = "client-ip" //基于时间段的回调结构 type DurationBaseCallBack struct { record *InitializeRecord - callBack serverconnector.ResponseCallBack + callBack ResponseCallBack duration time.Duration } @@ -51,7 +81,7 @@ type InitializeRecord struct { identifier *CounterIdentifier counterSet *StreamCounterSet counterKeys map[time.Duration]uint32 - callback serverconnector.ResponseCallBack + callback ResponseCallBack initSendTimeMilli int64 lastRecvTimeMilli int64 } @@ -91,7 +121,7 @@ type StreamCounterSet struct { //回调函数 counters map[uint32]*DurationBaseCallBack //连接器 - asyncConnector *AsyncRateLimitConnector + asyncConnector *asyncRateLimitConnector //上一次连接失败的时间点 lastConnectFailTimeMilli int64 //创建时间 @@ -103,7 +133,7 @@ type StreamCounterSet struct { } //新建流管理器 -func NewStreamCounterSet(asyncConnector *AsyncRateLimitConnector, identifier *HostIdentifier) *StreamCounterSet { +func NewStreamCounterSet(asyncConnector *asyncRateLimitConnector, identifier *HostIdentifier) *StreamCounterSet { streamCounterSet := &StreamCounterSet{ mutex: &sync.RWMutex{}, asyncConnector: asyncConnector, @@ -158,8 +188,8 @@ func (s *StreamCounterSet) createConnection() (*grpc.ClientConn, error) { } //初始化操作的前置检查 -func (s *StreamCounterSet) preInitCheck(counterIdentifier CounterIdentifier, - callback serverconnector.ResponseCallBack) rlimitV2.RateLimitGRPCV2_ServiceClient { +func (s *StreamCounterSet) preInitCheck( + counterIdentifier CounterIdentifier, callback ResponseCallBack) rlimitV2.RateLimitGRPCV2_ServiceClient { s.mutex.Lock() defer s.mutex.Unlock() if nil == s.conn { @@ -175,15 +205,12 @@ func (s *StreamCounterSet) preInitCheck(counterIdentifier CounterIdentifier, s.client = rlimitV2.NewRateLimitGRPCV2Client(s.conn) } if nil == s.serviceStream { - selfHost := s.asyncConnector.clientInfo.GetIPString() - ctx, cancel := connector.CreateHeaderContext(0, map[string]string{headerKeyClientIP: selfHost}) + selfHost := s.asyncConnector.getIPString(s.HostIdentifier.host, s.HostIdentifier.port) + ctx := createHeaderContext(map[string]string{headerKeyClientIP: selfHost}) serviceStream, err := s.client.Service(ctx) if nil != err { log.GetNetworkLogger().Errorf("[RateLimit]fail to create serviceStream to %s:%d, err is %v", s.HostIdentifier.host, s.HostIdentifier.port, err) - if nil != cancel { - cancel() - } s.conn.Close() return nil } @@ -211,9 +238,15 @@ func (s *StreamCounterSet) preInitCheck(counterIdentifier CounterIdentifier, return s.serviceStream } +func createHeaderContext(headers map[string]string) context.Context { + md := metadata.New(headers) + var ctx context.Context + ctx = context.Background() + return metadata.NewOutgoingContext(ctx, md) +} + //发送初始化请求 -func (s *StreamCounterSet) SendInitRequest( - initReq *rlimitV2.RateLimitInitRequest, callback serverconnector.ResponseCallBack) { +func (s *StreamCounterSet) SendInitRequest(initReq *rlimitV2.RateLimitInitRequest, callback ResponseCallBack) { counterIdentifier := CounterIdentifier{ service: initReq.GetTarget().GetService(), namespace: initReq.GetTarget().GetNamespace(), @@ -544,7 +577,7 @@ func (c CounterIdentifier) String() string { } // 目前只实现了 RateLimit-Acquire的异步 和 metric-report的异步 -type AsyncRateLimitConnector struct { +type asyncRateLimitConnector struct { //读写锁,守护streams列表 mutex *sync.RWMutex //IP端口到Stream的映射,一个IP端口只有一个stream @@ -555,8 +588,10 @@ type AsyncRateLimitConnector struct { valueCtx model.ValueContext //单次加载 once *sync.Once + //获取自身IP的互斥锁 + clientHostMutex *sync.Mutex //自身IP信息 - clientInfo *network.ClientInfo + clientHost string //连接超时时间 connTimeout time.Duration //消息超时时间 @@ -574,32 +609,30 @@ type AsyncRateLimitConnector struct { } // NewAsyncRateLimitConnector -func NewAsyncRateLimitConnector( - valueCtx model.ValueContext, clientInfo *network.ClientInfo, cfg config.Configuration) *AsyncRateLimitConnector { +func NewAsyncRateLimitConnector(valueCtx model.ValueContext, cfg config.Configuration) AsyncRateLimitConnector { connTimeout := cfg.GetGlobal().GetServerConnector().GetConnectTimeout() msgTimeout := cfg.GetGlobal().GetServerConnector().GetMessageTimeout() protocol := cfg.GetGlobal().GetServerConnector().GetProtocol() purgeInterval := cfg.GetProvider().GetRateLimit().GetPurgeInterval() connIdleTimeout := cfg.GetGlobal().GetServerConnector().GetConnectionIdleTimeout() reconnectInterval := cfg.GetGlobal().GetServerConnector().GetReconnectInterval() - connector := &AsyncRateLimitConnector{ + return &asyncRateLimitConnector{ mutex: &sync.RWMutex{}, streams: make(map[HostIdentifier]*StreamCounterSet), valueCtx: valueCtx, - clientInfo: clientInfo, connTimeout: connTimeout, msgTimeout: msgTimeout, purgeInterval: purgeInterval, connIdleTimeout: connIdleTimeout, reconnectInterval: reconnectInterval, once: &sync.Once{}, + clientHostMutex: &sync.Mutex{}, protocol: protocol, } - return connector } //淘汰流管理器 -func (a *AsyncRateLimitConnector) dropStreamCounterSet( +func (a *asyncRateLimitConnector) dropStreamCounterSet( streamCounterSet *StreamCounterSet, serviceStream rlimitV2.RateLimitGRPCV2_ServiceClient) { a.mutex.Lock() defer a.mutex.Unlock() @@ -611,7 +644,7 @@ func (a *AsyncRateLimitConnector) dropStreamCounterSet( } //获取流计数器 -func (a *AsyncRateLimitConnector) getStreamCounterSet(hostIdentifier HostIdentifier) (*StreamCounterSet, error) { +func (a *asyncRateLimitConnector) getStreamCounterSet(hostIdentifier HostIdentifier) (*StreamCounterSet, error) { a.mutex.RLock() defer a.mutex.RUnlock() if a.destroyed { @@ -621,7 +654,7 @@ func (a *AsyncRateLimitConnector) getStreamCounterSet(hostIdentifier HostIdentif } //定时处理过期任务 -func (a *AsyncRateLimitConnector) Process( +func (a *asyncRateLimitConnector) Process( taskKey interface{}, taskValue interface{}, lastProcessTime time.Time) model.TaskResult { counterSet := taskValue.(*StreamCounterSet) nowMilli := model.CurrentMillisecond() @@ -646,20 +679,20 @@ func (a *AsyncRateLimitConnector) Process( } //OnTaskEvent 任务事件回调 -func (a *AsyncRateLimitConnector) OnTaskEvent(event model.TaskEvent) { +func (a *asyncRateLimitConnector) OnTaskEvent(event model.TaskEvent) { } //获取stream的数量,用于测试 -func (a *AsyncRateLimitConnector) StreamCount() int { +func (a *asyncRateLimitConnector) StreamCount() int { a.mutex.RLock() defer a.mutex.RUnlock() return len(a.streams) } //创建流上下文 -func (a *AsyncRateLimitConnector) GetMessageSender( - svcKey model.ServiceKey, hashValue uint64) (serverconnector.RateLimitMsgSender, error) { +func (a *asyncRateLimitConnector) GetMessageSender( + svcKey model.ServiceKey, hashValue uint64) (RateLimitMsgSender, error) { req := &model.GetOneInstanceRequest{} req.Service = svcKey.Service req.Namespace = svcKey.Namespace @@ -703,8 +736,25 @@ func (a *AsyncRateLimitConnector) GetMessageSender( return counterSet, nil } +func (a *asyncRateLimitConnector) getIPString(remoteHost string, remotePort uint32) string { + a.clientHostMutex.Lock() + defer a.clientHostMutex.Unlock() + if len(a.clientHost) > 0 { + return a.clientHost + } + addr := fmt.Sprintf("%s:%d", remoteHost, remotePort) + conn, err := net.Dial("tcp", addr) + if nil != err { + log.GetNetworkLogger().Errorf("fail to dial %s to get local host, err is %v", err) + return "" + } + localAddr := conn.LocalAddr().String() + a.clientHost = strings.Split(localAddr, ":")[0] + return a.clientHost +} + //清理 -func (a *AsyncRateLimitConnector) Destroy() { +func (a *asyncRateLimitConnector) Destroy() { a.mutex.Lock() a.destroyed = true streams := a.streams diff --git a/pkg/flow/quota/ticker.go b/pkg/flow/quota/ticker.go index 4e6e5d7a..9c1fc891 100644 --- a/pkg/flow/quota/ticker.go +++ b/pkg/flow/quota/ticker.go @@ -25,33 +25,28 @@ import ( "github.com/polarismesh/polaris-go/pkg/model" "github.com/polarismesh/polaris-go/pkg/plugin" "github.com/polarismesh/polaris-go/pkg/plugin/localregistry" - "github.com/polarismesh/polaris-go/pkg/plugin/serverconnector" "time" ) //远程配额查询任务 type RemoteQuotaCallBack struct { registry localregistry.InstancesRegistry - asyncRLimitConnector serverconnector.AsyncRateLimitConnector + asyncRLimitConnector AsyncRateLimitConnector engine model.Engine scalableRand *rand.ScalableRand } //创建查询任务 func NewRemoteQuotaCallback(cfg config.Configuration, supplier plugin.Supplier, - engine model.Engine) (*RemoteQuotaCallBack, error) { + engine model.Engine, connector AsyncRateLimitConnector) (*RemoteQuotaCallBack, error) { registry, err := data.GetRegistry(cfg, supplier) if nil != err { return nil, err } - connector, err := data.GetServerConnector(cfg, supplier) - if nil != err { - return nil, err - } return &RemoteQuotaCallBack{ scalableRand: rand.NewScalableRand(), registry: registry, - asyncRLimitConnector: connector.GetAsyncRateLimitConnector(), + asyncRLimitConnector: connector, engine: engine}, nil } diff --git a/pkg/flow/quota/window.go b/pkg/flow/quota/window.go index 384e604f..e66689a8 100644 --- a/pkg/flow/quota/window.go +++ b/pkg/flow/quota/window.go @@ -28,8 +28,6 @@ import ( "github.com/polarismesh/polaris-go/pkg/plugin" "github.com/polarismesh/polaris-go/pkg/plugin/common" "github.com/polarismesh/polaris-go/pkg/plugin/ratelimiter" - "github.com/polarismesh/polaris-go/pkg/plugin/serverconnector" - "github.com/modern-go/reflect2" "strings" "sync" "sync/atomic" @@ -433,12 +431,6 @@ func (r *RateLimitWindow) buildRemoteConfigMode(windowSet *RateLimitWindowSet, r } r.remoteCluster.Namespace = rule.GetCluster().GetNamespace().GetValue() r.remoteCluster.Service = rule.GetCluster().GetService().GetValue() - if len(r.remoteCluster.Namespace) == 0 || len(r.remoteCluster.Service) == 0 { - if !reflect2.IsNil(windowSet.flowAssistant.remoteClusterByConfig) { - r.remoteCluster.Namespace = windowSet.flowAssistant.remoteClusterByConfig.GetNamespace() - r.remoteCluster.Service = windowSet.flowAssistant.remoteClusterByConfig.GetService() - } - } if len(r.remoteCluster.Namespace) == 0 || len(r.remoteCluster.Service) == 0 { r.configMode = model.ConfigQuotaLocalMode } else { @@ -502,9 +494,6 @@ type contextKey struct { //ToString方法 func (k *contextKey) String() string { return "rateLimit context value " + k.name } -//key,用于共享错误信息 -var errKey = &contextKey{name: "ctxError"} - //错误容器,用于传递上下文错误信息 type errContainer struct { err atomic.Value @@ -540,7 +529,7 @@ func (r *RateLimitWindow) Engine() model.Engine { } //获取异步连接器 -func (r *RateLimitWindow) AsyncRateLimitConnector() serverconnector.AsyncRateLimitConnector { +func (r *RateLimitWindow) AsyncRateLimitConnector() AsyncRateLimitConnector { return r.WindowSet.flowAssistant.asyncRateLimitConnector } diff --git a/pkg/network/impl.go b/pkg/network/impl.go index 06af9f48..992325f0 100644 --- a/pkg/network/impl.go +++ b/pkg/network/impl.go @@ -426,7 +426,7 @@ func (c *connectionManager) ReportConnectionDown(connID ConnID) { //已经切换新连接,忽略 return } - if IsAvailableConnection(curConn) { + if nil != curConn && IsAvailableConnection(curConn) { curConn.lazyClose(false) } } diff --git a/pkg/network/manager.go b/pkg/network/manager.go index f98b10e8..6cf56abe 100644 --- a/pkg/network/manager.go +++ b/pkg/network/manager.go @@ -19,10 +19,10 @@ package network import ( "fmt" - "github.com/polarismesh/polaris-go/pkg/config" - "github.com/polarismesh/polaris-go/pkg/model" "github.com/google/uuid" "github.com/modern-go/reflect2" + "github.com/polarismesh/polaris-go/pkg/config" + "github.com/polarismesh/polaris-go/pkg/model" "sync/atomic" "time" ) @@ -71,39 +71,39 @@ func (c *ClientInfo) GetHashKey() []byte { //通用的连接管理器 type ConnectionManager interface { - //设置当前协议的连接创建器 + // SetConnCreator 设置当前协议的连接创建器 SetConnCreator(creator ConnCreator) - //销毁并释放连接管理器 + // Destroy 销毁并释放连接管理器 Destroy() - //获取并占用连接 + // GetConnection 获取并占用连接 GetConnection(opKey string, clusterType config.ClusterType) (*Connection, error) - //通过传入一致性hashKey的方式获取链接 + // GetConnectionByHashKey 通过传入一致性hashKey的方式获取链接 GetConnectionByHashKey(opKey string, clusterType config.ClusterType, hashKey []byte) (*Connection, error) - //报告连接故障 + // ReportConnectionDown 报告连接故障 ReportConnectionDown(connID ConnID) - //上报接口调用成功 + // ReportSuccess 上报接口调用成功 ReportSuccess(connID ConnID, retCode int32, timeout time.Duration) - //上报接口调用失败 + // ReportFail 上报接口调用失败 ReportFail(connID ConnID, retCode int32, timeout time.Duration) - //更新服务地址 + // UpdateServers 更新服务地址 UpdateServers(svcEventKey model.ServiceEventKey) - //获取当前客户端信息 + // GetClientInfo 获取当前客户端信息 GetClientInfo() *ClientInfo - //discover服务是否已经就绪 + // IsReady discover服务是否已经就绪 IsReady() bool - //计算hash Key对应的实例 + // GetHashExpectedInstance 计算hash Key对应的实例 GetHashExpectedInstance(clusterType config.ClusterType, hash []byte) (string, model.Instance, error) - //直接通过addr连接,慎使用 + // ConnectByAddr 直接通过addr连接,慎使用 ConnectByAddr(clusterType config.ClusterType, addr string, instance model.Instance) (*Connection, error) } diff --git a/pkg/plugin/serverconnector/serverconnector.go b/pkg/plugin/serverconnector/serverconnector.go index c721e84c..aa84abf8 100644 --- a/pkg/plugin/serverconnector/serverconnector.go +++ b/pkg/plugin/serverconnector/serverconnector.go @@ -18,13 +18,12 @@ package serverconnector import ( + "github.com/golang/protobuf/proto" "github.com/polarismesh/polaris-go/pkg/config" "github.com/polarismesh/polaris-go/pkg/model" - rlimitV2 "github.com/polarismesh/polaris-go/pkg/model/pb/metric/v2" namingpb "github.com/polarismesh/polaris-go/pkg/model/pb/v1" "github.com/polarismesh/polaris-go/pkg/plugin" "github.com/polarismesh/polaris-go/pkg/plugin/common" - "github.com/golang/protobuf/proto" "time" ) @@ -69,34 +68,6 @@ type MessageCallBack interface { OnResponse(proto.Message) } -//应答回调函数 -type ResponseCallBack interface { - //应答回调函数 - OnInitResponse(counter *rlimitV2.QuotaCounter, duration time.Duration, curTimeMilli int64) - //应答回调函数 - OnReportResponse(counter *rlimitV2.QuotaLeft, duration time.Duration, curTimeMilli int64) -} - -//限流消息同步器 -type RateLimitMsgSender interface { - //是否已经初始化 - HasInitialized(svcKey model.ServiceKey, labels string) bool - //发送初始化请求 - SendInitRequest(request *rlimitV2.RateLimitInitRequest, callback ResponseCallBack) - //发送上报请求 - SendReportRequest(request *rlimitV2.ClientRateLimitReportRequest) error - //同步时间 - AdjustTime() int64 -} - -//异步限流连接器 -type AsyncRateLimitConnector interface { - //初始化限流控制信息 - GetMessageSender(svcKey model.ServiceKey, hashValue uint64) (RateLimitMsgSender, error) - //销毁 - Destroy() -} - //ServerConnector 【扩展点接口】server代理,封装了server对接的逻辑 type ServerConnector interface { plugin.Plugin @@ -119,8 +90,6 @@ type ServerConnector interface { // 更新服务端地址 // 异常场景:当地址列表为空,或者地址全部连接失败,则返回error,调用者需进行重试 UpdateServers(key *model.ServiceEventKey) error - //获取限流server连接器 - GetAsyncRateLimitConnector() AsyncRateLimitConnector } //初始化 diff --git a/plugin/serverconnector/grpc/operation_async.go b/plugin/serverconnector/grpc/operation_async.go index 51c70238..a6c16e87 100644 --- a/plugin/serverconnector/grpc/operation_async.go +++ b/plugin/serverconnector/grpc/operation_async.go @@ -48,7 +48,6 @@ type Connector struct { connManager network.ConnectionManager connectionIdleTimeout time.Duration valueCtx model.ValueContext - asyncRLimitConnector *AsyncRateLimitConnector discoverConnector *connector.DiscoverConnector //有没有打印过connManager ready的信息,用于避免重复打印 hasPrintedReady uint32 @@ -75,7 +74,6 @@ func (g *Connector) Init(ctx *plugin.InitContext) error { g.connManager = ctx.ConnManager g.connectionIdleTimeout = ctx.Config.GetGlobal().GetServerConnector().GetConnectionIdleTimeout() g.valueCtx = ctx.ValueCtx - g.asyncRLimitConnector = NewAsyncRateLimitConnector(g.valueCtx, ctx.ConnManager.GetClientInfo(), ctx.Config) protocol := ctx.Config.GetGlobal().GetServerConnector().GetProtocol() if protocol == g.Name() { log.GetBaseLogger().Infof("set %s plugin as connectionCreator", g.Name()) @@ -116,13 +114,12 @@ func (g *Connector) Destroy() error { return nil } -// enable +// IsEnable func (g *Connector) IsEnable(cfg config.Configuration) bool { if cfg.GetGlobal().GetSystem().GetMode() == model.ModeWithAgent { return false - } else { - return true } + return true } //RegisterServiceHandler 注册服务监听器 @@ -143,10 +140,6 @@ func (g *Connector) UpdateServers(key *model.ServiceEventKey) error { return g.discoverConnector.UpdateServers(key) } -func (g *Connector) GetAsyncRateLimitConnector() serverconnector.AsyncRateLimitConnector { - return g.asyncRLimitConnector -} - //init 注册插件信息 func init() { plugin.RegisterConfigurablePlugin(&Connector{}, &networkConfig{}) diff --git a/polaris.yaml b/polaris.yaml index 7e51d28a..9d97c1dd 100644 --- a/polaris.yaml +++ b/polaris.yaml @@ -96,7 +96,7 @@ global: #描述:是否将统计信息上报至monitor #类型:bool #默认值:true - enable: true + enable: false #描述:启用的统计上报插件类型 #类型:list #范围:已经注册的统计上报插件的名字 diff --git a/sample/features/ratelimit/main.go b/sample/features/ratelimit/main.go new file mode 100644 index 00000000..1c87374a --- /dev/null +++ b/sample/features/ratelimit/main.go @@ -0,0 +1,171 @@ +/** + * Tencent is pleased to support the open source community by making CL5 available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package main + +import ( + "flag" + "fmt" + "github.com/polarismesh/polaris-go/api" + "github.com/polarismesh/polaris-go/pkg/algorithm/rand" + "log" + "os" + "os/signal" + "strings" + "sync" + "sync/atomic" + "time" +) + +//通过的请求数量 +var passReq int64 + +//全量的请求数 +var allReq int64 + +//限流请求 +var quotaReq api.QuotaRequest + +//limitApi +var limitAPI api.LimitAPI + +//每个线程进行限流请求的间隔 +var sleepWindow []time.Duration + +var ( + namespace string + service string + labels string + //并发数 + concurrency int + //多久获取一次配额 + interval int +) + +func initArgs() { + flag.StringVar(&namespace, "namespace", "default", "namespace") + flag.StringVar(&service, "service", "TestSvc", "service") + flag.StringVar(&labels, "labels", "", "labels") + flag.IntVar(&concurrency, "concurrency", 1, "concurrency") + flag.IntVar(&interval, "interval", 20, "interval") +} + +//限流模拟程序 +func main() { + initArgs() + flag.Parse() + if len(namespace) == 0 || len(service) == 0 { + log.Print("namespace and service are required") + return + } + labelsMap, err := ParseLabels(labels) + if err != nil { + log.Fatalf("fail to parse labels %s, err: %v", labels, err) + } + log.Printf("labels: %v", labelsMap) + quotaReq = api.NewQuotaRequest() + quotaReq.SetLabels(labelsMap) + quotaReq.SetNamespace(namespace) + quotaReq.SetService(service) + limitAPI, err = api.NewLimitAPI() + if nil != err { + log.Fatalf("fail to create limitAPI, err: %v", err) + } + scalableRand := rand.NewScalableRand() + sleepWindow = make([]time.Duration, concurrency) + for i := 0; i < concurrency; i++ { + factor := scalableRand.Intn(3) + 1 + sleepWindow[i] = time.Duration(factor)*time.Millisecond + time.Duration(interval)*time.Millisecond + } + wg := &sync.WaitGroup{} + wg.Add(concurrency) + stop := make(chan struct{}) + for i := 0; i < concurrency; i++ { + go getQuota(wg, stop, i) + } + //合建chan + c := make(chan os.Signal) + //监听所有信号 + signal.Notify(c) + var previousIntervalPass int64 + var previousAllReq int64 + ticker := time.NewTicker(time.Second) + defer ticker.Stop() +L1: + for { + select { + case <-ticker.C: + currentIntervalPass := atomic.LoadInt64(&passReq) + currentAllReq := atomic.LoadInt64(&allReq) + log.Printf("intervalMs pass: %d, all %d", + currentIntervalPass-previousIntervalPass, currentAllReq-previousAllReq) + previousIntervalPass = currentIntervalPass + previousAllReq = currentAllReq + case s := <-c: + log.Printf("receive quit signal %v", s) + break L1 + } + } + close(stop) + log.Printf("stop closed") + wg.Wait() + log.Printf("total Pass %d, all %d", passReq, allReq) + limitAPI.Destroy() +} + +//进行限流请求的线程执行的函数 +func getQuota(wg *sync.WaitGroup, stop <-chan struct{}, id int) { + log.Printf("thread %d starts, sleep %v", id, sleepWindow[id]) + defer wg.Done() + for { + select { + case <-stop: + log.Printf("thread %d stops", id) + return + default: + result, err := limitAPI.GetQuota(quotaReq) + if nil != err { + log.Fatalf("fail to get quota, err: %v", err) + } + atomic.AddInt64(&allReq, 1) + if result.Get().Code == api.QuotaResultOk { + atomic.AddInt64(&passReq, 1) + } + //每个线程按照间隔时间进行请求 + if sleepWindow[id] > 0 { + time.Sleep(sleepWindow[id]) + } + } + } +} + +//解析标签列表 +func ParseLabels(labelsStr string) (map[string]string, error) { + strLabels := strings.Split(labelsStr, ",") + labels := make(map[string]string, len(strLabels)) + for _, strLabel := range strLabels { + if len(strLabel) == 0 { + continue + } + labelKv := strings.Split(strLabel, ":") + if len(labelKv) != 2 { + return nil, fmt.Errorf("invalid kv pair str %s", strLabel) + } + labels[labelKv[0]] = labelKv[1] + } + return labels, nil +} diff --git a/sample/features/ratelimit/polaris.yaml b/sample/features/ratelimit/polaris.yaml new file mode 100644 index 00000000..233cdf3a --- /dev/null +++ b/sample/features/ratelimit/polaris.yaml @@ -0,0 +1,12 @@ +global: + serverConnector: + addresses: + - 127.0.0.1:8091 +provider: + rateLimit: + enable: true + rules: + - namespace: default + service: TestSvc + maxAmount: 10 + validDuration: 1s diff --git a/test/ratelimit/remote_normal_suite.go b/test/ratelimit/remote_normal_suite.go index 190b7c62..421aa13b 100644 --- a/test/ratelimit/remote_normal_suite.go +++ b/test/ratelimit/remote_normal_suite.go @@ -57,28 +57,6 @@ type IndexResult struct { code model.QuotaResultCode } -// 测试不能设置polaris.metric为限流集群 -func (rt *RemoteNormalTestingSuite) TestNoSetPolarisMetric(c *check.C) { - log.Printf("Start TestNoSetPolarisMetric") - cfg := api.NewConfiguration() - consumer, err := api.NewProviderAPIByConfig(cfg) - _ = consumer - c.Assert(err, check.IsNil) - consumer.Destroy() - - cfg = api.NewConfiguration() - cfg.GetProvider().GetRateLimit().SetRateLimitCluster(config.ServerNamespace, "polaris.metric.test") - consumer, err = api.NewProviderAPIByConfig(cfg) - c.Assert(err, check.IsNil) - consumer.Destroy() - - cfg = api.NewConfiguration() - cfg.GetProvider().GetRateLimit().SetRateLimitCluster(config.ServerNamespace, config.ForbidServerMetricService) - consumer, err = api.NewProviderAPIByConfig(cfg) - fmt.Println(err) - c.Assert(err, check.NotNil) -} - //测试远程精准匹配限流 func (rt *RemoteNormalTestingSuite) TestRemoteTwoDuration(c *check.C) { log.Printf("Start TestRemoteTwoDuration") @@ -270,8 +248,6 @@ func (rt *RemoteNormalTestingSuite) TestRemoteRegexCombineV2(c *check.C) { go func(idx int) { defer wg.Done() cfg := config.NewDefaultConfiguration([]string{mockDiscoverAddress}) - //测试通过SDK来设置集群名,兼容场景 - cfg.GetProvider().GetRateLimit().SetRateLimitCluster(config.ServerNamespace, rateLimitSvcName) limitAPI, err := api.NewLimitAPIByConfig(cfg) c.Assert(err, check.IsNil) defer limitAPI.Destroy() @@ -348,8 +324,6 @@ func (rt *RemoteNormalTestingSuite) TestRemoteShareEqually(c *check.C) { go func(idx int) { defer wg.Done() cfg := config.NewDefaultConfiguration([]string{mockDiscoverAddress}) - //测试通过SDK来设置集群名,兼容场景 - cfg.GetProvider().GetRateLimit().SetRateLimitCluster(config.ServerNamespace, rateLimitSvcName) limitAPI, err := api.NewLimitAPIByConfig(cfg) c.Assert(err, check.IsNil) defer limitAPI.Destroy() @@ -362,7 +336,7 @@ func (rt *RemoteNormalTestingSuite) TestRemoteShareEqually(c *check.C) { map[string]string{"appIdShare": "appShare"}) atomic.AddInt64(&calledCount, 1) curTime := model.CurrentMillisecond() - if curTime - startTime >= 1000 { + if curTime-startTime >= 1000 { //前500ms是上下线,不计算 codeChan <- IndexResult{ index: idx, @@ -409,4 +383,4 @@ func (rt *RemoteNormalTestingSuite) TestRemoteShareEqually(c *check.C) { } c.Assert(allocatedPerSecond >= 170 && allocatedPerSecond <= 260, check.Equals, true) } -} \ No newline at end of file +} diff --git a/test/ratelimit/window_expire_suite.go b/test/ratelimit/window_expire_suite.go index fe831e70..c0ab466b 100644 --- a/test/ratelimit/window_expire_suite.go +++ b/test/ratelimit/window_expire_suite.go @@ -22,8 +22,6 @@ import ( "github.com/polarismesh/polaris-go/api" "github.com/polarismesh/polaris-go/pkg/config" "github.com/polarismesh/polaris-go/pkg/flow" - "github.com/polarismesh/polaris-go/pkg/flow/data" - "github.com/polarismesh/polaris-go/plugin/serverconnector/grpc" "gopkg.in/check.v1" "time" ) @@ -148,9 +146,7 @@ func (rt *WindowExpireTestingSuite) TestUinExpiredRemote(c *check.C) { taskValues := engine.FlowQuotaAssistant().TaskValues() c.Assert(taskValues.Started(), check.Equals, true) - connector, err := data.GetServerConnector(cfg, engine.PluginSupplier()) - c.Assert(err, check.IsNil) - asyncRateLimitConnector := connector.GetAsyncRateLimitConnector().(*grpc.AsyncRateLimitConnector) + asyncRateLimitConnector := engine.FlowQuotaAssistant().AsyncRateLimitConnector() c.Assert(asyncRateLimitConnector.StreamCount(), check.Equals, 1) //等待淘汰 diff --git a/test/testdata/ratelimit_rule_v2/remote_normal.json b/test/testdata/ratelimit_rule_v2/remote_normal.json index 299f682d..f678172b 100644 --- a/test/testdata/ratelimit_rule_v2/remote_normal.json +++ b/test/testdata/ratelimit_rule_v2/remote_normal.json @@ -1,110 +1,114 @@ -{ - "rules": [ - { - "id": "r0001", - "service": "RemoteTestSvcV2", - "namespace": "Test", - "labels": { - "method": { - "type": "EXACT", - "value": "query" - }, - "uin": { - "type": "EXACT", - "value": "007" - } - }, - "resource": "QPS", - "type": "GLOBAL", - "action": "reject", - "amount_mode": "GLOBAL_TOTAL", - "amounts": [ - { - "maxAmount": 200, - "validDuration": "1s" - }, - { - "maxAmount": 800, - "validDuration": "10s" - } - ], - "cluster": { - "namespace": "Polaris", - "service": "polaris.metric.test.ide" - } - }, - { - "id": "r0002", - "service": "RemoteTestSvcV2", - "namespace": "Test", - "labels": { - "appId": { - "type": "REGEX", - "value": ".+" - } - }, - "resource": "QPS", - "type": "GLOBAL", - "action": "reject", - "regex_combine": false, - "amount_mode": "GLOBAL_TOTAL", - "amounts": [ - { - "maxAmount": 100, - "validDuration": "1s" - } - ], - "cluster": { - "namespace": "Polaris", - "service": "polaris.metric.test.ide" - } - }, - { - "id": "r0003", - "service": "RemoteTestSvcV2", - "namespace": "Test", - "labels": { - "test_uin": { - "type": "REGEX", - "value": ".+" - } - }, - "resource": "QPS", - "type": "GLOBAL", - "action": "reject", - "regex_combine": true, - "amount_mode": "GLOBAL_TOTAL", - "amounts": [ - { - "maxAmount": 300, - "validDuration": "1s" - } - ] - }, - { - "id": "r0004", - "service": "RemoteTestSvcV2", - "namespace": "Test", - "labels": { - "appIdShare": { - "type": "EXACT", - "value": "appShare" - } - }, - "resource": "QPS", - "type": "GLOBAL", - "action": "reject", - "amount_mode": "SHARE_EQUALLY", - "amounts": [ - { - "maxAmount": 20, - "validDuration": "1s" - } - ], - "cluster": { - "namespace": "Polaris", - "service": "polaris.metric.test.ide" - } - } - ] +{ + "rules": [ + { + "id": "r0001", + "service": "RemoteTestSvcV2", + "namespace": "Test", + "labels": { + "method": { + "type": "EXACT", + "value": "query" + }, + "uin": { + "type": "EXACT", + "value": "007" + } + }, + "resource": "QPS", + "type": "GLOBAL", + "action": "reject", + "amount_mode": "GLOBAL_TOTAL", + "amounts": [ + { + "maxAmount": 200, + "validDuration": "1s" + }, + { + "maxAmount": 800, + "validDuration": "10s" + } + ], + "cluster": { + "namespace": "Polaris", + "service": "polaris.metric.test.ide" + } + }, + { + "id": "r0002", + "service": "RemoteTestSvcV2", + "namespace": "Test", + "labels": { + "appId": { + "type": "REGEX", + "value": ".+" + } + }, + "resource": "QPS", + "type": "GLOBAL", + "action": "reject", + "regex_combine": false, + "amount_mode": "GLOBAL_TOTAL", + "amounts": [ + { + "maxAmount": 100, + "validDuration": "1s" + } + ], + "cluster": { + "namespace": "Polaris", + "service": "polaris.metric.test.ide" + } + }, + { + "id": "r0003", + "service": "RemoteTestSvcV2", + "namespace": "Test", + "labels": { + "test_uin": { + "type": "REGEX", + "value": ".+" + } + }, + "resource": "QPS", + "type": "GLOBAL", + "action": "reject", + "regex_combine": true, + "amount_mode": "GLOBAL_TOTAL", + "amounts": [ + { + "maxAmount": 300, + "validDuration": "1s" + } + ], + "cluster": { + "namespace": "Polaris", + "service": "polaris.metric.test.ide" + } + }, + { + "id": "r0004", + "service": "RemoteTestSvcV2", + "namespace": "Test", + "labels": { + "appIdShare": { + "type": "EXACT", + "value": "appShare" + } + }, + "resource": "QPS", + "type": "GLOBAL", + "action": "reject", + "amount_mode": "SHARE_EQUALLY", + "amounts": [ + { + "maxAmount": 20, + "validDuration": "1s" + } + ], + "cluster": { + "namespace": "Polaris", + "service": "polaris.metric.test.ide" + } + } + ] } \ No newline at end of file diff --git a/test/testdata/ratelimit_rule_v2/window_expire.json b/test/testdata/ratelimit_rule_v2/window_expire.json index 94fa131a..9e216b49 100644 --- a/test/testdata/ratelimit_rule_v2/window_expire.json +++ b/test/testdata/ratelimit_rule_v2/window_expire.json @@ -1,56 +1,56 @@ -{ - "rules": [ - { - "id": "we0001", - "service": "ExpireTestSvcV2", - "namespace": "Test", - "priority": 0, - "labels": { - "appId": { - "type": "EXACT", - "value": "remote" - }, - "uin": { - "type": "REGEX", - "value": ".+" - } - }, - "resource": "QPS", - "type": "GLOBAL", - "action": "reject", - "amount_mode": "GLOBAL_TOTAL", - "amounts": [ - { - "maxAmount": 200, - "validDuration": "1s" - } - ], - "cluster": { - "namespace": "Polaris", - "service": "polaris.metric.test.ide" - } - }, - { - "id": "we0002", - "service": "ExpireTestSvcV2", - "namespace": "Test", - "priority": 1, - "labels": { - "uin": { - "type": "REGEX", - "value": ".+" - } - }, - "resource": "QPS", - "type": "LOCAL", - "action": "reject", - "amount_mode": "GLOBAL_TOTAL", - "amounts": [ - { - "maxAmount": 50, - "validDuration": "1s" - } - ] - } - ] +{ + "rules": [ + { + "id": "we0001", + "service": "ExpireTestSvcV2", + "namespace": "Test", + "priority": 0, + "labels": { + "appId": { + "type": "EXACT", + "value": "remote" + }, + "uin": { + "type": "REGEX", + "value": ".+" + } + }, + "resource": "QPS", + "type": "GLOBAL", + "action": "reject", + "amount_mode": "GLOBAL_TOTAL", + "amounts": [ + { + "maxAmount": 200, + "validDuration": "1s" + } + ], + "cluster": { + "namespace": "Polaris", + "service": "polaris.metric.test.ide" + } + }, + { + "id": "we0002", + "service": "ExpireTestSvcV2", + "namespace": "Test", + "priority": 1, + "labels": { + "uin": { + "type": "REGEX", + "value": ".+" + } + }, + "resource": "QPS", + "type": "LOCAL", + "action": "reject", + "amount_mode": "GLOBAL_TOTAL", + "amounts": [ + { + "maxAmount": 50, + "validDuration": "1s" + } + ] + } + ] } \ No newline at end of file