Skip to content

Commit

Permalink
Merge pull request nacos-group#2 from nacos-group/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
lzp0412 authored Jul 14, 2020
2 parents 3dc1d05 + a3c7614 commit 65c315b
Show file tree
Hide file tree
Showing 22 changed files with 777 additions and 287 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ constant.ClientConfig{
ListenInterval: 30 * 1000, //监听间隔时间,单位毫秒(仅在ConfigClient中有效)
BeatInterval: 5 * 1000, //心跳间隔时间,单位毫秒(仅在ServiceClient中有效)
NamespaceId: "public", //nacos命名空间
Endpoint: "" //获取nacos节点ip的服务地址
Endpoint: "", //获取nacos节点ip的服务地址
CacheDir: "/data/nacos/cache", //缓存目录
LogDIr: "/data/nacos/log", //日志目录
LogDir: "/data/nacos/log", //日志目录
UpdateThreadNum: 20, //更新服务的线程数
NotLoadCacheAtStart: true, //在启动时不读取本地缓存数据,true--不读取,false--读取
UpdateCacheWhenEmpty: true, //当服务列表为空时是否更新本地缓存,true--更新,false--不更新
Expand Down Expand Up @@ -223,4 +223,4 @@ configClient.ListenConfig(vo.ConfigParam{
},
})

```
```
107 changes: 55 additions & 52 deletions clients/config_client/config_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,16 @@ import (
"github.com/nacos-group/nacos-sdk-go/common/logger"
"github.com/nacos-group/nacos-sdk-go/common/nacos_error"
"github.com/nacos-group/nacos-sdk-go/common/util"
"github.com/nacos-group/nacos-sdk-go/model"
"github.com/nacos-group/nacos-sdk-go/utils"
"github.com/nacos-group/nacos-sdk-go/vo"

"github.com/aliyun/alibaba-cloud-sdk-go/services/kms"
"io/ioutil"
"log"
"net/http"
"os"
"strconv"
"strings"
"sync"
"time"
)

type ConfigClient struct {
Expand Down Expand Up @@ -104,7 +102,7 @@ func (client *ConfigClient) decrypt(dataId, content string) (string, error) {
request.CiphertextBlob = content
response, err := client.kmsClient.Decrypt(request)
if err != nil {
return "", errors.New("ksm decrypt failed")
return "", errors.New("kms decrypt failed")
}
content = response.Plaintext
}
Expand Down Expand Up @@ -197,25 +195,31 @@ func (client *ConfigClient) AddConfigToListen(params []vo.ConfigParam) (err erro
return
}

//Cancel Listen Config
func (client *ConfigClient) CancelListenConfig(param *vo.ConfigParam) error {
param.ListenCloseChan <- struct{}{}
log.Printf("Cancel listen config DataId:%s Group:%s", param.DataId, param.Group)
return nil
}

func (client *ConfigClient) ListenConfig(param vo.ConfigParam) (err error) {
go func() {
for {
clientConfig, serverConfigs, agent, err := client.sync()
// 创建计时器
var timer *time.Timer
if err == nil {
timer = time.NewTimer(time.Duration(clientConfig.ListenInterval) * time.Millisecond)
select {
case <-param.ListenCloseChan:
return
default:
clientConfig, _ := client.GetClientConfig()
client.listenConfigTask(clientConfig, param)
}
client.listenConfigTask(clientConfig, serverConfigs, agent, param)
<-timer.C

}
}()

return nil
}

func (client *ConfigClient) listenConfigTask(clientConfig constant.ClientConfig,
serverConfigs []constant.ServerConfig, agent http_agent.IHttpAgent, param vo.ConfigParam) {
func (client *ConfigClient) listenConfigTask(clientConfig constant.ClientConfig, param vo.ConfigParam) {
var listeningConfigs string
// 检查&拼接监听参数
client.mutex.Lock()
Expand Down Expand Up @@ -256,23 +260,16 @@ func (client *ConfigClient) listenConfigTask(clientConfig constant.ClientConfig,
params := make(map[string]string)
params[constant.KEY_LISTEN_CONFIGS] = listeningConfigs
var changed string

for _, serverConfig := range client.configProxy.GetServerList() {
path := client.buildBasePath(serverConfig) + "/listener"
changedTmp, err := listen(agent, path, clientConfig.TimeoutMs, clientConfig.ListenInterval, params)
if err == nil {
changedTmp, err := client.configProxy.ListenConfig(params, tenant, clientConfig.AccessKey, clientConfig.SecretKey)
if err == nil {
changed = changedTmp
} else {
if _, ok := err.(*nacos_error.NacosError); ok {
changed = changedTmp
break
} else {
if _, ok := err.(*nacos_error.NacosError); ok {
changed = changedTmp
break
} else {
log.Println("[client.ListenConfig] listen config error:", err.Error())
}
log.Println("[client.ListenConfig] listen config error:", err.Error())
}
}

if strings.ToLower(strings.Trim(changed, " ")) == "" {
log.Println("[client.ListenConfig] no change")
} else {
Expand All @@ -281,32 +278,6 @@ func (client *ConfigClient) listenConfigTask(clientConfig constant.ClientConfig,
}
}

func listen(agent http_agent.IHttpAgent, path string,
timeoutMs uint64, listenInterval uint64,
params map[string]string) (changed string, err error) {
header := map[string][]string{
"Content-Type": {"application/x-www-form-urlencoded"},
"Long-Pulling-Timeout": {strconv.FormatUint(listenInterval, 10)},
}
log.Println("[client.ListenConfig] request url:", path, " ;params:", params, " ;header:", header)
var response *http.Response
response, err = agent.Post(path, header, timeoutMs, params)
if err == nil {
bytes, errRead := ioutil.ReadAll(response.Body)
defer response.Body.Close()
if errRead != nil {
err = errRead
} else {
if response.StatusCode == 200 {
changed = string(bytes)
} else {
err = nacos_error.NewNacosError(strconv.Itoa(response.StatusCode), string(bytes), nil)
}
}
}
return
}

func (client *ConfigClient) updateLocalConfig(changed string, param vo.ConfigParam) {
client.mutex.Lock()
defer client.mutex.Unlock()
Expand Down Expand Up @@ -381,3 +352,35 @@ func (client *ConfigClient) buildBasePath(serverConfig constant.ServerConfig) (b
strconv.FormatUint(serverConfig.Port, 10) + serverConfig.ContextPath + constant.CONFIG_PATH
return
}

func (client *ConfigClient) SearchConfig(param vo.SearchConfigParm) (*model.ConfigPage, error) {
return client.searchConfigInnter(param)
}

func (client *ConfigClient) searchConfigInnter(param vo.SearchConfigParm) (*model.ConfigPage, error) {
if param.Search != "accurate" && param.Search != "blur" {
return nil, errors.New("[client.searchConfigInnter] param.search must be accurate or blur")
}
if param.PageNo <= 0 {
param.PageNo = 1
}
if param.PageSize <= 0 {
param.PageSize = 10
}
clientConfig, _ := client.GetClientConfig()
configItems, err := client.configProxy.SearchConfigProxy(param, clientConfig.NamespaceId, clientConfig.AccessKey, clientConfig.SecretKey)
if err != nil {
log.Printf("[ERROR] search config from server error:%s ", err.Error())
if _, ok := err.(*nacos_error.NacosError); ok {
nacosErr := err.(*nacos_error.NacosError)
if nacosErr.ErrorCode() == "404" {
return nil, errors.New("config not found")
}
if nacosErr.ErrorCode() == "403" {
return nil, errors.New("get config forbidden")
}
}
return nil, err
}
return configItems, nil
}
10 changes: 10 additions & 0 deletions clients/config_client/config_client_interface.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config_client

import (
"github.com/nacos-group/nacos-sdk-go/model"
"github.com/nacos-group/nacos-sdk-go/vo"
)

Expand Down Expand Up @@ -40,4 +41,13 @@ type IConfigClient interface {
// group require
// tenant ==>nacos.namespace optional
ListenConfig(params vo.ConfigParam) (err error)

// 搜索配置
// search require search=accurate--精确搜索 search=blur--模糊搜索
// group option
// dataId option
// tenant ==>nacos.namespace optional
// pageNo option
// pageSize option
SearchConfig(param vo.SearchConfigParm) (*model.ConfigPage, error)
}
Loading

0 comments on commit 65c315b

Please sign in to comment.