From 7287cf8d974ef3e59d7560fee6c546a97d2cf67b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A6=82=E6=BC=AB?= Date: Thu, 11 Apr 2024 17:44:34 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E8=B0=83=E6=95=B4=E8=8A=82=E7=82=B9?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E6=97=B6=E8=BF=87=E6=BB=A4=E4=BD=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../naming_cache/service_info_holder.go | 40 +++++++++++++++ .../naming_grpc/naming_grpc_proxy.go | 42 ---------------- example/test_client/main.go | 49 +++++++++++++++++++ 3 files changed, 89 insertions(+), 42 deletions(-) create mode 100644 example/test_client/main.go diff --git a/clients/naming_client/naming_cache/service_info_holder.go b/clients/naming_client/naming_cache/service_info_holder.go index 5ddaf0f6..9d4a54cf 100644 --- a/clients/naming_client/naming_cache/service_info_holder.go +++ b/clients/naming_client/naming_cache/service_info_holder.go @@ -17,6 +17,7 @@ package naming_cache import ( + "fmt" "os" "reflect" "sort" @@ -83,6 +84,45 @@ func (s *ServiceInfoHolder) ProcessService(service *model.Service) { } } + // 灰度节点过滤 + //过滤不符合当前节点标签的实例 + tag := os.Getenv("ALICLOUD_SERVICE_TAG") + if tag == "" { + tag = "base" + } + fmt.Printf("[NamingGrpcProxy.Subscribe] instance tag: %v\n", tag) + tagMapList := make([]model.Instance, 0) //标签节点列表 + backUpMapList := make([]model.Instance, 0) //普通节点列表 + + for _, host := range service.Hosts { + // 如果没有metadata, 认为是普通实例 + if host.Metadata == nil { + backUpMapList = append(backUpMapList, host) + continue + } + + instanceTag, ok := host.Metadata["alicloud.service.tag"] + fmt.Printf("[NamingGrpcProxy.Subscribe] host: %v, metadata : %v\n", host, instanceTag) + if !ok || instanceTag == "base" || instanceTag == "" { //普通节点,加入到backUp列表中 + backUpMapList = append(backUpMapList, host) + continue + } + + if instanceTag == tag { + tagMapList = append(tagMapList, host) + } + } + + if tag != "base" && len(tagMapList) != 0 { + fmt.Printf("[NamingGrpcProxy.Subscribe] change host list, tag: %v, list: %v\n", tag, tagMapList) + service.Hosts = tagMapList + } + if (tag == "base" || tag == "") && len(backUpMapList) != 0 { + fmt.Printf("[NamingGrpcProxy.Subscribe] change host list, tag: %v, list: %v\n", tag, backUpMapList) + service.Hosts = backUpMapList + } + + //继续后续处理 cacheKey := util.GetServiceCacheKey(util.GetGroupName(service.Name, service.GroupName), service.Clusters) oldDomain, ok := s.ServiceInfoMap.Load(cacheKey) if ok && oldDomain.(model.Service).LastRefTime >= service.LastRefTime { diff --git a/clients/naming_client/naming_grpc/naming_grpc_proxy.go b/clients/naming_client/naming_grpc/naming_grpc_proxy.go index 7de07808..d58b3b64 100644 --- a/clients/naming_client/naming_grpc/naming_grpc_proxy.go +++ b/clients/naming_client/naming_grpc/naming_grpc_proxy.go @@ -18,7 +18,6 @@ package naming_grpc import ( "context" - "fmt" "os" "time" @@ -218,47 +217,6 @@ func (proxy *NamingGrpcProxy) Subscribe(serviceName, groupName string, clusters return model.Service{}, err } subscribeServiceResponse := response.(*rpc_response.SubscribeServiceResponse) - - //过滤不符合当前节点标签的实例 - if len(subscribeServiceResponse.ServiceInfo.Hosts) != 0 { - tag := os.Getenv("ALICLOUD_SERVICE_TAG") - if tag == "" { - tag = "base" - } - fmt.Printf("[NamingGrpcProxy.Subscribe] instance tag: %v\n", tag) - tagMapList := make([]model.Instance, 0) //标签节点列表 - backUpMapList := make([]model.Instance, 0) //普通节点列表 - - for _, host := range subscribeServiceResponse.ServiceInfo.Hosts { - // 如果没有metadata, 认为是普通实例 - if host.Metadata == nil { - backUpMapList = append(backUpMapList, host) - continue - } - - instanceTag, ok := host.Metadata["alicloud.service.tag"] - fmt.Printf("[NamingGrpcProxy.Subscribe] host: %v, metadata : %v\n", host, instanceTag) - if !ok || instanceTag == "base" || instanceTag == "" { //普通节点,加入到backUp列表中 - backUpMapList = append(backUpMapList, host) - continue - } - - if instanceTag == tag { - tagMapList = append(tagMapList, host) - } - } - - if tag != "base" && len(tagMapList) != 0 { - fmt.Printf("[NamingGrpcProxy.Subscribe] change host list, tag: %v, list: %v\n", tag, tagMapList) - subscribeServiceResponse.ServiceInfo.Hosts = tagMapList - } - if (tag == "base" || tag == "") && len(backUpMapList) != 0 { - fmt.Printf("[NamingGrpcProxy.Subscribe] change host list, tag: %v, list: %v\n", tag, tagMapList) - subscribeServiceResponse.ServiceInfo.Hosts = backUpMapList - } - } - - fmt.Printf("[NamingGrpcProxy.Subscribe] final service info: %v\n", subscribeServiceResponse.ServiceInfo) return subscribeServiceResponse.ServiceInfo, nil } diff --git a/example/test_client/main.go b/example/test_client/main.go new file mode 100644 index 00000000..90a7a9ae --- /dev/null +++ b/example/test_client/main.go @@ -0,0 +1,49 @@ +package main + +import ( + "fmt" + "github.com/nacos-group/nacos-sdk-go/v2/clients" + "github.com/nacos-group/nacos-sdk-go/v2/common/constant" + "github.com/nacos-group/nacos-sdk-go/v2/vo" + "time" +) + +func main() { + clientConfig := constant.ClientConfig{ + NamespaceId: "public", // 当存在多个 Namespace 时填写对应 Namespace ID,否则使用 public + TimeoutMs: 5000, + NotLoadCacheAtStart: true, + LogDir: "data/nacos/log", + CacheDir: "data/nacos/cache", + LogLevel: "debug", + } + + serverConfig := []constant.ServerConfig{ + { + IpAddr: "mse-a49bd920-p.nacos-ans.mse.aliyuncs.com", // Nacos 服务的IP地址 + Port: 8848, + }, + } + + var err error + namingClient, err := clients.NewNamingClient( + vo.NacosClientParam{ + ServerConfigs: serverConfig, + ClientConfig: &clientConfig}) + if err != nil { + panic(err) + } + + for { + instance, err := namingClient.SelectOneHealthyInstance(vo.SelectOneHealthInstanceParam{ + ServiceName: "gin-server-a.default.svc.cluster.local", + }) + if err != nil { + fmt.Printf("select one healthy instance err: %v\n", err) + } else { + fmt.Printf("selected instance info: %v\n", instance) + } + + time.Sleep(time.Second * 5) + } +}