Skip to content

Commit

Permalink
feat: nacos服务发现灰度
Browse files Browse the repository at this point in the history
- 节点上报时metadata对节点打标
- 节点查询时根据当前节点情况过滤实例列表
  • Loading branch information
如漫 committed Apr 11, 2024
1 parent 6dd8997 commit a15babc
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 2 deletions.
66 changes: 66 additions & 0 deletions clients/naming_client/naming_grpc/naming_grpc_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package naming_grpc

import (
"context"
"os"
"time"

"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client/naming_cache"
Expand Down Expand Up @@ -94,6 +95,21 @@ func (proxy *NamingGrpcProxy) requestToServer(request rpc_request.IRequest) (rpc
func (proxy *NamingGrpcProxy) RegisterInstance(serviceName string, groupName string, instance model.Instance) (bool, error) {
logger.Infof("register instance namespaceId:<%s>,serviceName:<%s> with instance:<%s>",
proxy.clientConfig.NamespaceId, serviceName, util.ToJsonString(instance))

tag := os.Getenv("ALICLOUD_SERVICE_TAG")
if tag == "" {
tag = "base"
}

//添加节点标签
if instance.Metadata == nil {
instance.Metadata = map[string]string{
"alicloud.service.tag": tag,
}
} else {
instance.Metadata["alicloud.service.tag"] = tag
}

proxy.eventListener.CacheInstanceForRedo(serviceName, groupName, instance)
instanceRequest := rpc_request.NewInstanceRequest(proxy.clientConfig.NamespaceId, serviceName, groupName, "registerInstance", instance)
response, err := proxy.requestToServer(instanceRequest)
Expand All @@ -107,6 +123,22 @@ func (proxy *NamingGrpcProxy) RegisterInstance(serviceName string, groupName str
func (proxy *NamingGrpcProxy) BatchRegisterInstance(serviceName string, groupName string, instances []model.Instance) (bool, error) {
logger.Infof("batch register instance namespaceId:<%s>,serviceName:<%s> with instance:<%s>",
proxy.clientConfig.NamespaceId, serviceName, util.ToJsonString(instances))

tag := os.Getenv("ALICLOUD_SERVICE_TAG")
if tag == "" {
tag = "base"
}
for _, instance := range instances {
//添加节点标签
if instance.Metadata == nil {
instance.Metadata = map[string]string{
"alicloud.service.tag": tag,
}
} else {
instance.Metadata["alicloud.service.tag"] = tag
}
}

proxy.eventListener.CacheInstancesForRedo(serviceName, groupName, instances)
batchInstanceRequest := rpc_request.NewBatchInstanceRequest(proxy.clientConfig.NamespaceId, serviceName, groupName, "batchRegisterInstance", instances)
response, err := proxy.requestToServer(batchInstanceRequest)
Expand Down Expand Up @@ -185,6 +217,40 @@ func (proxy *NamingGrpcProxy) Subscribe(serviceName, groupName string, clusters
return model.Service{}, err
}
subscribeServiceResponse := response.(*rpc_response.SubscribeServiceResponse)

//过滤不符合当前节点标签的实例
if len(subscribeServiceResponse.ServiceInfo.Hosts) != 0 {
tag := os.Getenv("ALICLOUD_SERVICE_TAG")
tagMapList := make([]model.Instance, 0)
backUpMapList := make([]model.Instance, 0)

for _, host := range subscribeServiceResponse.ServiceInfo.Hosts {
// 如果没有metadata, 认为是普通实例
if host.Metadata == nil {
backUpMapList = append(backUpMapList, host)
continue
}

instanceTag, ok := host.Metadata["alicloud.service.tag"]
if !ok || instanceTag == "base" || instanceTag == "" { //普通节点,加入到backUp列表中
backUpMapList = append(backUpMapList, host)
continue
}

if instanceTag == tag { // 节点标签和当前标签能够匹配,直接加入列表
tagMapList = append(tagMapList, host)
} else if (tag == "base" && instanceTag == "") || (tag == "" && instanceTag == "base") || (tag == "" && instanceTag == "") { //兼容普通节点的情况
tagMapList = append(tagMapList, host)
}
}

if len(tagMapList) != 0 {
subscribeServiceResponse.ServiceInfo.Hosts = tagMapList
} else if len(backUpMapList) != 0 {
subscribeServiceResponse.ServiceInfo.Hosts = backUpMapList
}
}

return subscribeServiceResponse.ServiceInfo, nil
}

Expand Down
16 changes: 16 additions & 0 deletions clients/naming_client/naming_http/naming_http_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package naming_http
import (
"context"
"net/http"
"os"
"strconv"
"time"

Expand Down Expand Up @@ -62,6 +63,21 @@ func NewNamingHttpProxy(ctx context.Context, clientCfg constant.ClientConfig, na
func (proxy *NamingHttpProxy) RegisterInstance(serviceName string, groupName string, instance model.Instance) (bool, error) {
logger.Infof("register instance namespaceId:<%s>,serviceName:<%s> with instance:<%s>",
proxy.clientConfig.NamespaceId, serviceName, util.ToJsonString(instance))

tag := os.Getenv("ALICLOUD_SERVICE_TAG")
if tag == "" {
tag = "base"
}

//添加节点标签
if instance.Metadata == nil {
instance.Metadata = map[string]string{
"alicloud.service.tag": tag,
}
} else {
instance.Metadata["alicloud.service.tag"] = tag
}

serviceName = util.GetGroupName(serviceName, groupName)
params := map[string]string{}
params["namespaceId"] = proxy.clientConfig.NamespaceId
Expand Down
3 changes: 1 addition & 2 deletions clients/naming_client/naming_proxy_delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@ package naming_client

import (
"context"
"github.com/nacos-group/nacos-sdk-go/v2/inner/uuid"

"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client/naming_cache"
"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client/naming_grpc"
"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client/naming_http"
"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client/naming_proxy"
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
"github.com/nacos-group/nacos-sdk-go/v2/common/http_agent"
"github.com/nacos-group/nacos-sdk-go/v2/common/nacos_server"
"github.com/nacos-group/nacos-sdk-go/v2/inner/uuid"
"github.com/nacos-group/nacos-sdk-go/v2/model"
"github.com/nacos-group/nacos-sdk-go/v2/util"
)
Expand Down

0 comments on commit a15babc

Please sign in to comment.