Skip to content

Commit

Permalink
Merge pull request #41 from chuntaojun/feat_report_client
Browse files Browse the repository at this point in the history
[ISSUE #18] support get location plugin
  • Loading branch information
andrewshan authored Apr 20, 2022
2 parents 2bc61e6 + a7c5a52 commit ddddfe5
Show file tree
Hide file tree
Showing 21 changed files with 691 additions and 38 deletions.
12 changes: 12 additions & 0 deletions pkg/config/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type GlobalConfig interface {
GetServerConnector() ServerConnectorConfig
// GetStatReporter global.statReporter前缀开头的所有配置项
GetStatReporter() StatReporterConfig
// GetLocation global.location前缀开头的所有配置项
GetLocation() LocationConfig
}

// ConsumerConfig consumer config object
Expand Down Expand Up @@ -185,6 +187,16 @@ type StatReporterConfig interface {
SetChain([]string)
}

// LocationConfig SDK获取自身当前地理位置配置
type LocationConfig interface {
BaseConfig
PluginConfig
// GetProvider 获取地理位置的提供者插件名称
GetProvider() string
// SetProvider 设置地理位置的提供者插件名称
SetProvider(string)
}

// ServerConnectorConfig 与名字服务服务端的连接配置
type ServerConnectorConfig interface {
BaseConfig
Expand Down
8 changes: 8 additions & 0 deletions pkg/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ const (
DefaultMapKeyValueSeparator = ":"
// 默认Map组装str (key:value) 二元组分割符
DefaultMapKVTupleSeparator = "|"
// 默认实例地理位置提供者插件名称
DefaultLocationProvider = ""
)

// 默认埋点server的端口,与上面的IP一一对应
Expand Down Expand Up @@ -394,6 +396,9 @@ func (g *GlobalConfigImpl) Verify() error {
if err = g.StatReporter.Verify(); err != nil {
errs = multierror.Append(errs, err)
}
if err = g.Location.Verify(); err != nil {
errs = multierror.Append(errs, err)
}
return errs
}

Expand All @@ -403,6 +408,7 @@ func (g *GlobalConfigImpl) SetDefault() {
g.ServerConnector.SetDefault()
g.System.SetDefault()
g.StatReporter.SetDefault()
g.Location.SetDefault()
}

// 全局配置初始化
Expand All @@ -414,6 +420,8 @@ func (g *GlobalConfigImpl) Init() {
g.ServerConnector.Init()
g.StatReporter = &StatReporterConfigImpl{}
g.StatReporter.Init()
g.Location = &LocationConfigImpl{}
g.Location.Init()
}

// 初始化ConsumerConfigImpl
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type GlobalConfigImpl struct {
API *APIConfigImpl `yaml:"api" json:"api"`
ServerConnector *ServerConnectorConfigImpl `yaml:"serverConnector" json:"serverConnector"`
StatReporter *StatReporterConfigImpl `yaml:"statReporter" json:"statReporter"`
Location *LocationConfigImpl `yaml:"location" json:"location"`
}

// GetSystem 获取系统配置
Expand All @@ -78,6 +79,10 @@ func (g *GlobalConfigImpl) GetStatReporter() StatReporterConfig {
return g.StatReporter
}

func (g *GlobalConfigImpl) GetLocation() LocationConfig {
return g.Location
}

// ConsumerConfigImpl 消费者配置
type ConsumerConfigImpl struct {
LocalCache *LocalCacheConfigImpl `yaml:"localCache" json:"localCache"`
Expand Down
68 changes: 68 additions & 0 deletions pkg/config/location.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Tencent is pleased to support the open source community by making polaris-go available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package config

import "github.com/polarismesh/polaris-go/pkg/plugin/common"

type LocationConfigImpl struct {
Provider string `yaml:"provider" json:"provider"`
// 插件相关配置
Plugin PluginConfigs `yaml:"plugin" json:"plugin"`
}

// GetProvider 获取地理位置的提供者插件名称
func (a *LocationConfigImpl) GetProvider() string {
return a.Provider
}

// SetProvider 设置地理位置的提供者插件名称
func (a *LocationConfigImpl) SetProvider(provider string) {
a.Provider = provider
}

// Init 配置初始化
func (a *LocationConfigImpl) Init() {
a.Plugin = PluginConfigs{}
a.Plugin.Init(common.TypeLocationProvider)
}

// GetPluginConfig consumer.loadbalancer.plugin
func (a *LocationConfigImpl) GetPluginConfig(pluginName string) BaseConfig {
cfgValue, ok := a.Plugin[pluginName]
if !ok {
return nil
}
return cfgValue.(BaseConfig)
}

// SetPluginConfig 输出插件具体配置
func (a *LocationConfigImpl) SetPluginConfig(pluginName string, value BaseConfig) error {
return a.Plugin.SetPluginConfig(common.TypeLocationProvider, pluginName, value)
}

// Verify 检验LocalCacheConfig配置
func (a *LocationConfigImpl) Verify() error {
return nil
}

// SetDefault 设置LocalCacheConfig配置的默认值
func (a *LocationConfigImpl) SetDefault() {
if len(a.Provider) == 0 {
a.Provider = DefaultLocationProvider
}
}
21 changes: 21 additions & 0 deletions pkg/flow/data/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/polarismesh/polaris-go/pkg/plugin/healthcheck"
"github.com/polarismesh/polaris-go/pkg/plugin/loadbalancer"
"github.com/polarismesh/polaris-go/pkg/plugin/localregistry"
"github.com/polarismesh/polaris-go/pkg/plugin/reporthandler"
"github.com/polarismesh/polaris-go/pkg/plugin/serverconnector"
"github.com/polarismesh/polaris-go/pkg/plugin/servicerouter"
"github.com/polarismesh/polaris-go/pkg/plugin/statreporter"
Expand Down Expand Up @@ -156,6 +157,26 @@ func GetLoadBalancerByLbType(lbType string, supplier plugin.Supplier) (loadbalan
return targetPlugin.(loadbalancer.LoadBalancer), nil
}

// GetReportChain 获取ReportClient处理链
func GetReportChain(cfg config.Configuration, supplier plugin.Supplier) (*reporthandler.ReportHandlerChain, error) {
chain := &reporthandler.ReportHandlerChain{
Chain: make([]reporthandler.ReportHandler, 0),
}

pluginNames := supplier.GetPluginsByType(common.TypeReportHandler)

for i := range pluginNames {
name := pluginNames[i]
p, err := supplier.GetPlugin(common.TypeReportHandler, name)
if err != nil {
return nil, err
}
chain.Chain = append(chain.Chain, p.(reporthandler.ReportHandler))
}

return chain, nil
}

// SingleInvoke 同步调用的通用方法定义
type SingleInvoke func(request interface{}) (interface{}, error)

Expand Down
45 changes: 21 additions & 24 deletions pkg/flow/startup/client_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
namingpb "github.com/polarismesh/polaris-go/pkg/model/pb/v1"
"github.com/polarismesh/polaris-go/pkg/plugin"
"github.com/polarismesh/polaris-go/pkg/plugin/localregistry"
"github.com/polarismesh/polaris-go/pkg/plugin/reporthandler"
"github.com/polarismesh/polaris-go/pkg/plugin/serverconnector"
"github.com/polarismesh/polaris-go/pkg/version"
)
Expand All @@ -44,6 +45,9 @@ func NewReportClientCallBack(
if callback.registry, err = data.GetRegistry(cfg, supplier); err != nil {
return nil, err
}
if callback.reportChain, err = data.GetReportChain(cfg, supplier); err != nil {
return nil, err
}
callback.configuration = cfg
callback.globalCtx = globalCtx
callback.interval = cfg.GetGlobal().GetAPI().GetReportInterval()
Expand All @@ -58,6 +62,7 @@ type ReportClientCallBack struct {
configuration config.Configuration
globalCtx model.ValueContext
interval time.Duration
reportChain *reporthandler.ReportHandlerChain
}

const (
Expand All @@ -74,12 +79,13 @@ func (r *ReportClientCallBack) loadLocalClientReportResult() {
log.GetBaseLogger().Warnf("fail to load local region info from %s, err is %v", cachedFile, err)
return
}
location := resp.GetClient().GetLocation()
r.updateLocation(&model.Location{
Region: location.GetRegion().GetValue(),
Zone: location.GetZone().GetValue(),
Campus: location.GetCampus().GetValue(),
}, nil)

client := resp.Client

for i := range r.reportChain.Chain {
handler := r.reportChain.Chain[i]
handler.InitLocal(client)
}
}

// reportClientRequest 客户端上报的请求
Expand Down Expand Up @@ -113,31 +119,22 @@ func (r *ReportClientCallBack) Process(
reportClientResp, err := r.connector.ReportClient(reportClientReq)
if err != nil {
log.GetBaseLogger().Errorf("report client info:%+v, error:%v", reportClientReq, err)
r.updateLocation(nil, err.(model.SDKError))
// 发生错误也要重试,直到获取到地域信息为止
for i := range r.reportChain.Chain {
handler := r.reportChain.Chain[i]
handler.HandleResponse(reportClientResp, err)
}
return model.CONTINUE
}
r.updateLocation(&model.Location{
Region: reportClientResp.Region,
Zone: reportClientResp.Zone,
Campus: reportClientResp.Campus,
}, nil)

for i := range r.reportChain.Chain {
handler := r.reportChain.Chain[i]
handler.HandleResponse(reportClientResp, nil)
}
return model.CONTINUE
}

// OnTaskEvent 任务事件回调
func (r *ReportClientCallBack) OnTaskEvent(event model.TaskEvent) {

}

// updateLocation 更新区域属性
func (r *ReportClientCallBack) updateLocation(location *model.Location, lastErr model.SDKError) {
if nil != location {
// 已获取到客户端的地域信息,更新到全局上下文
log.GetBaseLogger().Infof("current client area info is {Region:%s, Zone:%s, Campus:%s}",
location.Region, location.Zone, location.Campus)
}
if r.globalCtx.SetCurrentLocation(location, lastErr) {
log.GetBaseLogger().Infof("client area info is ready")
}
}
6 changes: 6 additions & 0 deletions pkg/flow/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,12 @@ func (e *Engine) SyncRegister(instance *model.InstanceRegisterRequest) (*model.I
// 方法开始时间
startTime := e.globalCtx.Now()
svcKey := model.ServiceKey{Namespace: instance.Namespace, Service: instance.Service}

// 如果注册请求没有设置 Location 信息,则由内部自动设置
if instance.Location == nil {
instance.Location = e.globalCtx.GetCurrentLocation().GetLocation()
}

resp, err := data.RetrySyncCall("register", &svcKey, instance, func(request interface{}) (interface{}, error) {
return e.connector.RegisterInstance(request.(*model.InstanceRegisterRequest))
}, param)
Expand Down
12 changes: 12 additions & 0 deletions pkg/model/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,8 @@ type InstanceRegisterRequest struct {
// ttl超时时间,如果节点要调用heartbeat上报,则必须填写,否则会400141错误码,单位:秒
TTL *int

Location *Location

// 可选,单次查询超时时间,默认直接获取全局的超时配置
// 用户总最大超时时间为(1+RetryCount) * Timeout
Timeout *time.Duration
Expand Down Expand Up @@ -1157,6 +1159,11 @@ func (g *InstanceRegisterRequest) SetTTL(ttl int) {
g.TTL = &ttl
}

// SetLocation 设置服务实例的地理信息
func (g *InstanceRegisterRequest) SetLocation(loc *Location) {
g.Location = loc
}

// GetTimeoutPtr 获取超时值指针
func (g *InstanceRegisterRequest) GetTimeoutPtr() *time.Duration {
return g.Timeout
Expand All @@ -1167,6 +1174,11 @@ func (g *InstanceRegisterRequest) GetRetryCountPtr() *int {
return g.RetryCount
}

// GetLocation 获取实例的地址信息
func (g *InstanceRegisterRequest) GetLocation() *Location {
return g.Location
}

// validateMetadata 校验元数据的key是否为空
func validateMetadata(prefix string, metadata map[string]string) error {
if len(metadata) > 0 {
Expand Down
32 changes: 20 additions & 12 deletions pkg/plugin/common/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,27 @@ const (
TypeRateLimiter Type = 0x1010
// TypeSubScribe .
TypeSubScribe Type = 0x1011
// TypeLocationProvider 实例地理信息获取扩展点
TypeLocationProvider Type = 0x1012
// TypeReportHandler ReportClient 请求、响应处理器
TypeReportHandler Type = 0x1013
)

var typeToPresent = map[Type]string{
TypePluginBase: "TypePluginBase",
TypeServerConnector: "serverConnector",
TypeLocalRegistry: "localRegistry",
TypeServiceRouter: "serviceRouter",
TypeLoadBalancer: "loadBalancer",
TypeHealthCheck: "healthChecker",
TypeCircuitBreaker: "circuitBreaker",
TypeWeightAdjuster: "weightAdjuster",
TypeStatReporter: "statReporter",
TypeAlarmReporter: "alarmReporter",
TypeRateLimiter: "rateLimiter",
TypeSubScribe: "subScribe",
TypePluginBase: "TypePluginBase",
TypeServerConnector: "serverConnector",
TypeLocalRegistry: "localRegistry",
TypeServiceRouter: "serviceRouter",
TypeLoadBalancer: "loadBalancer",
TypeHealthCheck: "healthChecker",
TypeCircuitBreaker: "circuitBreaker",
TypeWeightAdjuster: "weightAdjuster",
TypeStatReporter: "statReporter",
TypeAlarmReporter: "alarmReporter",
TypeRateLimiter: "rateLimiter",
TypeSubScribe: "subScribe",
TypeLocationProvider: "locationProvider",
TypeReportHandler: "reportHandler",
}

// ToString方法
Expand Down Expand Up @@ -235,4 +241,6 @@ var LoadedPluginTypes = []Type{
TypeLocalRegistry,
TypeRateLimiter,
TypeSubScribe,
TypeLocationProvider,
TypeReportHandler,
}
Loading

0 comments on commit ddddfe5

Please sign in to comment.