Skip to content

Commit

Permalink
Merge remote-tracking branch 'nacos_origin/master' into feature/impro…
Browse files Browse the repository at this point in the history
…ve_currentConnect
  • Loading branch information
joy999 committed Nov 20, 2023
2 parents dc731b4 + 28188c7 commit b05fa15
Show file tree
Hide file tree
Showing 38 changed files with 1,815 additions and 148 deletions.
13 changes: 6 additions & 7 deletions clients/cache/disk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package cache

import (
"encoding/json"
"io/ioutil"
"os"
"strconv"

Expand All @@ -42,22 +41,22 @@ func WriteServicesToFile(service *model.Service, cacheKey, cacheDir string) {
}
bytes, _ := json.Marshal(service)
domFileName := GetFileName(cacheKey, cacheDir)
err = ioutil.WriteFile(domFileName, bytes, 0666)
err = os.WriteFile(domFileName, bytes, 0666)
if err != nil {
logger.Errorf("failed to write name cache:%s ,value:%s ,err:%v", domFileName, string(bytes), err)
}
}

func ReadServicesFromFile(cacheDir string) map[string]model.Service {
files, err := ioutil.ReadDir(cacheDir)
files, err := os.ReadDir(cacheDir)
if err != nil {
logger.Errorf("read cacheDir:%s failed!err:%+v", cacheDir, err)
return nil
}
serviceMap := map[string]model.Service{}
for _, f := range files {
fileName := GetFileName(f.Name(), cacheDir)
b, err := ioutil.ReadFile(fileName)
b, err := os.ReadFile(fileName)
if err != nil {
logger.Errorf("failed to read name cache file:%s,err:%v ", fileName, err)
continue
Expand Down Expand Up @@ -87,7 +86,7 @@ func WriteConfigToFile(cacheKey string, cacheDir string, content string) {
}
return
}
err := ioutil.WriteFile(fileName, []byte(content), 0666)
err := os.WriteFile(fileName, []byte(content), 0666)
if err != nil {
logger.Errorf("failed to write config cache:%s ,value:%s ,err:%v", fileName, content, err)
}
Expand All @@ -96,7 +95,7 @@ func WriteConfigToFile(cacheKey string, cacheDir string, content string) {

func ReadConfigFromFile(cacheKey string, cacheDir string) (string, error) {
fileName := GetFileName(cacheKey, cacheDir)
b, err := ioutil.ReadFile(fileName)
b, err := os.ReadFile(fileName)
if err != nil {
logger.Errorf("get config from cache, cacheKey:%s, cacheDir:%s, error:%v ", cacheKey, cacheDir, err)
return "", errors.Errorf("failed to read config cache file:%s, cacheDir:%s, err:%v ", fileName, cacheDir, err)
Expand All @@ -111,7 +110,7 @@ func GetFailover(key, dir string) string {
return ""
}
logger.Warnf("reading failover content from path:%s", filePath)
fileContent, err := ioutil.ReadFile(filePath)
fileContent, err := os.ReadFile(filePath)
if err != nil {
logger.Errorf("fail to read failover content from %s", filePath)
return ""
Expand Down
3 changes: 1 addition & 2 deletions clients/cache/disk_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cache

import (
"fmt"
"io/ioutil"
"os"
"testing"

Expand Down Expand Up @@ -34,5 +33,5 @@ func TestGetFailover(t *testing.T) {

// write file content
func writeFileContent(filepath, content string) error {
return ioutil.WriteFile(filepath, []byte(content), 0666)
return os.WriteFile(filepath, []byte(content), 0666)
}
3 changes: 3 additions & 0 deletions clients/client_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ func setConfig(param vo.NacosClientParam) (iClient nacos_client.INacosClient, er
_ = client.SetServerConfig(nil)
} else {
for i := range param.ServerConfigs {
if param.ServerConfigs[i].Port == 0 {
param.ServerConfigs[i].Port = 8848
}
if param.ServerConfigs[i].GrpcPort == 0 {
param.ServerConfigs[i].GrpcPort = param.ServerConfigs[i].Port + constant.RpcPortOffset
}
Expand Down
115 changes: 84 additions & 31 deletions clients/config_client/config_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ import (
"sync"
"time"

"github.com/aliyun/alibaba-cloud-sdk-go/services/kms"
"github.com/alibabacloud-go/tea/tea"
dkms_api "github.com/aliyun/alibabacloud-dkms-gcs-go-sdk/openapi"
"github.com/nacos-group/nacos-sdk-go/v2/clients/cache"
"github.com/nacos-group/nacos-sdk-go/v2/clients/nacos_client"
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
nacos_inner_encryption "github.com/nacos-group/nacos-sdk-go/v2/common/encryption"
"github.com/nacos-group/nacos-sdk-go/v2/common/filter"
"github.com/nacos-group/nacos-sdk-go/v2/common/logger"
"github.com/nacos-group/nacos-sdk-go/v2/common/monitor"
"github.com/nacos-group/nacos-sdk-go/v2/common/nacos_error"
Expand All @@ -49,7 +52,7 @@ type ConfigClient struct {
ctx context.Context
cancel context.CancelFunc
nacos_client.INacosClient
kmsClient *kms.Client
kmsClient *nacos_inner_encryption.KmsClient
localConfigs []vo.ConfigParam
mutex sync.Mutex
configProxy IConfigProxy
Expand All @@ -66,6 +69,7 @@ type cacheData struct {
group string
content string
contentType string
encryptedDataKey string
tenant string
cacheDataListener *cacheDataListener
md5 string
Expand All @@ -84,12 +88,18 @@ func (cacheData *cacheData) executeListener() {
cacheData.cacheDataListener.lastMd5 = cacheData.md5
cacheData.configClient.cacheMap.Set(util.GetConfigCacheKey(cacheData.dataId, cacheData.group, cacheData.tenant), *cacheData)

decryptedContent, err := cacheData.configClient.decrypt(cacheData.dataId, cacheData.content)
if err != nil {
logger.Errorf("decrypt content fail ,dataId=%s,group=%s,tenant=%s,err:%+v ", cacheData.dataId,
param := &vo.ConfigParam{
DataId: cacheData.dataId,
Content: cacheData.content,
EncryptedDataKey: cacheData.encryptedDataKey,
UsageType: vo.ResponseType,
}
if err := filter.GetDefaultConfigFilterChainManager().DoFilters(param); err != nil {
logger.Errorf("do filters failed ,dataId=%s,group=%s,tenant=%s,err:%+v ", cacheData.dataId,
cacheData.group, cacheData.tenant, err)
return
}
decryptedContent := param.Content
go cacheData.cacheDataListener.listener(cacheData.tenant, cacheData.group, cacheData.dataId, decryptedContent)
}

Expand Down Expand Up @@ -121,7 +131,15 @@ func NewConfigClient(nc nacos_client.INacosClient) (*ConfigClient, error) {
}

if clientConfig.OpenKMS {
kmsClient, err := kms.NewClientWithAccessKey(clientConfig.RegionId, clientConfig.AccessKey, clientConfig.SecretKey)
var kmsClient *nacos_inner_encryption.KmsClient
switch clientConfig.KMSVersion {
case constant.KMSv1, constant.DEFAULT_KMS_VERSION:
kmsClient, err = initKmsV1Client(clientConfig)
case constant.KMSv3:
kmsClient, err = initKmsV3Client(clientConfig)
default:
err = fmt.Errorf("init kms client failed. unknown kms version:%s\n", clientConfig.KMSVersion)
}
if err != nil {
return nil, err
}
Expand All @@ -144,48 +162,57 @@ func initLogger(clientConfig constant.ClientConfig) error {
return logger.InitLogger(logger.BuildLoggerConfig(clientConfig))
}

func initKmsV1Client(clientConfig constant.ClientConfig) (*nacos_inner_encryption.KmsClient, error) {
return nacos_inner_encryption.InitDefaultKmsV1ClientWithAccessKey(clientConfig.RegionId, clientConfig.AccessKey, clientConfig.SecretKey)
}

func initKmsV3Client(clientConfig constant.ClientConfig) (*nacos_inner_encryption.KmsClient, error) {
return nacos_inner_encryption.InitDefaultKmsV3ClientWithConfig(&dkms_api.Config{
Protocol: tea.String("https"),
Endpoint: tea.String(clientConfig.KMSv3Config.Endpoint),
ClientKeyContent: tea.String(clientConfig.KMSv3Config.ClientKeyContent),
Password: tea.String(clientConfig.KMSv3Config.Password),
}, clientConfig.KMSv3Config.CaContent)
}

func (client *ConfigClient) GetConfig(param vo.ConfigParam) (content string, err error) {
content, err = client.getConfigInner(param)
content, err = client.getConfigInner(&param)
if err != nil {
return "", err
}
return client.decrypt(param.DataId, content)
param.UsageType = vo.ResponseType
if err = filter.GetDefaultConfigFilterChainManager().DoFilters(&param); err != nil {
return "", err
}
content = param.Content
return content, nil
}

func (client *ConfigClient) decrypt(dataId, content string) (string, error) {
var plainContent string
var err error
if client.kmsClient != nil && strings.HasPrefix(dataId, "cipher-") {
request := kms.CreateDecryptRequest()
request.Method = "POST"
request.Scheme = "https"
request.AcceptFormat = "json"
request.CiphertextBlob = content
response, err := client.kmsClient.Decrypt(request)
plainContent, err = client.kmsClient.Decrypt(content)
if err != nil {
return "", fmt.Errorf("kms decrypt failed: %v", err)
}
content = response.Plaintext
}
return content, nil
return plainContent, nil
}

func (client *ConfigClient) encrypt(dataId, content string) (string, error) {
func (client *ConfigClient) encrypt(dataId, content, kmsKeyId string) (string, error) {
var cipherContent string
var err error
if client.kmsClient != nil && strings.HasPrefix(dataId, "cipher-") {
request := kms.CreateEncryptRequest()
request.Method = "POST"
request.Scheme = "https"
request.AcceptFormat = "json"
request.KeyId = "alias/acs/mse" // use default key
request.Plaintext = content
response, err := client.kmsClient.Encrypt(request)
cipherContent, err = client.kmsClient.Encrypt(content, kmsKeyId)
if err != nil {
return "", fmt.Errorf("kms encrypt failed: %v", err)
}
content = response.CiphertextBlob
}
return content, nil
return cipherContent, nil
}

func (client *ConfigClient) getConfigInner(param vo.ConfigParam) (content string, err error) {
func (client *ConfigClient) getConfigInner(param *vo.ConfigParam) (content string, err error) {
if len(param.DataId) <= 0 {
err = errors.New("[client.GetConfig] param.dataId can not be empty")
return "", err
Expand Down Expand Up @@ -220,6 +247,11 @@ func (client *ConfigClient) getConfigInner(param vo.ConfigParam) (content string
logger.Warnf("read config from cache success, dataId=%s, group=%s, namespaceId=%s", param.DataId, param.Group, clientConfig.NamespaceId)
return cacheContent, nil
}
if response != nil && response.Response != nil && !response.IsSuccess() {
return response.Content, errors.New(response.GetMessage())
}
param.EncryptedDataKey = response.EncryptedDataKey
param.Content = response.Content
return response.Content, nil
}

Expand All @@ -236,8 +268,10 @@ func (client *ConfigClient) PublishConfig(param vo.ConfigParam) (published bool,
if len(param.Group) <= 0 {
param.Group = constant.DEFAULT_GROUP
}
if param.Content, err = client.encrypt(param.DataId, param.Content); err != nil {
return

param.UsageType = vo.RequestType
if err = filter.GetDefaultConfigFilterChainManager().DoFilters(&param); err != nil {
return false, err
}

clientConfig, _ := client.GetClientConfig()
Expand All @@ -250,8 +284,11 @@ func (client *ConfigClient) PublishConfig(param vo.ConfigParam) (published bool,
request.AdditionMap["encryptedDataKey"] = param.EncryptedDataKey
rpcClient := client.configProxy.getRpcClient(client)
response, err := client.configProxy.requestProxy(rpcClient, request, constant.DEFAULT_TIMEOUT_MILLS)
if err != nil {
return false, err
}
if response != nil {
return response.IsSuccess(), err
return client.buildResponse(response)
}
return false, err
}
Expand All @@ -270,8 +307,11 @@ func (client *ConfigClient) DeleteConfig(param vo.ConfigParam) (deleted bool, er
request := rpc_request.NewConfigRemoveRequest(param.Group, param.DataId, clientConfig.NamespaceId)
rpcClient := client.configProxy.getRpcClient(client)
response, err := client.configProxy.requestProxy(rpcClient, request, constant.DEFAULT_TIMEOUT_MILLS)
if err != nil {
return false, err
}
if response != nil {
return response.IsSuccess(), err
return client.buildResponse(response)
}
return false, err
}
Expand Down Expand Up @@ -478,8 +518,14 @@ func (client *ConfigClient) refreshContentAndCheck(cacheData cacheData, notify b
cacheData.group, cacheData.tenant)
return
}
if configQueryResponse != nil && configQueryResponse.Response != nil && !configQueryResponse.IsSuccess() {
logger.Errorf("refresh cached config from server error:%v, dataId=%s, group=%s", configQueryResponse.GetMessage(),
cacheData.dataId, cacheData.group)
return
}
cacheData.content = configQueryResponse.Content
cacheData.contentType = configQueryResponse.ContentType
cacheData.encryptedDataKey = configQueryResponse.EncryptedDataKey
if notify {
logger.Infof("[config_rpc_client] [data-received] dataId=%s, group=%s, tenant=%s, md5=%s, content=%s, type=%s",
cacheData.dataId, cacheData.group, cacheData.tenant, cacheData.md5,
Expand Down Expand Up @@ -519,3 +565,10 @@ func (client *ConfigClient) asyncNotifyListenConfig() {
client.listenExecute <- struct{}{}
}()
}

func (client *ConfigClient) buildResponse(response rpc_response.IResponse) (bool, error) {
if response.IsSuccess() {
return response.IsSuccess(), nil
}
return false, errors.New(response.GetMessage())
}
6 changes: 6 additions & 0 deletions clients/naming_client/naming_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ func (sc *NamingClient) selectInstances(service model.Service, healthy bool) ([]
}
hosts := service.Hosts
var result []model.Instance
logger.Infof("select instances with options: [healthy:<%s>], with service:<%s>", healthy, util.GetGroupName(service.Name, service.GroupName))
for _, host := range hosts {
if host.Healthy == healthy && host.Enable && host.Weight > 0 {
result = append(result, host)
Expand Down Expand Up @@ -342,6 +343,11 @@ func (sc *NamingClient) Unsubscribe(param *vo.SubscribeParam) (err error) {
return err
}

// ServerHealthy ...
func (sc *NamingClient) ServerHealthy() bool {
return sc.serviceProxy.ServerHealthy()
}

// CloseClient ...
func (sc *NamingClient) CloseClient() {
sc.serviceProxy.CloseClient()
Expand Down
3 changes: 3 additions & 0 deletions clients/naming_client/naming_client_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ type INamingClient interface {
// GetAllServicesInfo use to get all service info by page
GetAllServicesInfo(param vo.GetAllServiceInfoParam) (model.ServiceList, error)

// ServerHealthy use to check the connectivity to server
ServerHealthy() bool

//CloseClient close the GRPC client
CloseClient()
}
4 changes: 2 additions & 2 deletions clients/naming_client/naming_http/push_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"compress/gzip"
"context"
"encoding/json"
"io/ioutil"
"io"
"math/rand"
"net"
"strconv"
Expand Down Expand Up @@ -168,7 +168,7 @@ func TryDecompressData(data []byte) string {
}

defer reader.Close()
bs, err := ioutil.ReadAll(reader)
bs, err := io.ReadAll(reader)

if err != nil {
logger.Errorf("failed to decompress gzip data,err:%+v", err)
Expand Down
Loading

0 comments on commit b05fa15

Please sign in to comment.