Skip to content

Commit

Permalink
remove init to start the scheduling task (#251)
Browse files Browse the repository at this point in the history
  • Loading branch information
binbin0325 authored May 27, 2021
1 parent f366398 commit 56cb33f
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 65 deletions.
2 changes: 1 addition & 1 deletion clients/client_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewConfigClient(param vo.NacosClientParam) (iClient config_client.IConfigCl
if err != nil {
return
}
iClient = &config
iClient = config
return
}

Expand Down
98 changes: 46 additions & 52 deletions clients/config_client/config_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,21 @@ import (

type ConfigClient struct {
nacos_client.INacosClient
kmsClient *kms.Client
localConfigs []vo.ConfigParam
mutex sync.Mutex
configProxy ConfigProxy
configCacheDir string
kmsClient *kms.Client
localConfigs []vo.ConfigParam
mutex sync.Mutex
configProxy ConfigProxy
configCacheDir string
currentTaskCount int
cacheMap cache.ConcurrentMap
schedulerMap cache.ConcurrentMap
}

const (
perTaskConfigSize = 3000
executorErrDelay = 5 * time.Second
)

var (
currentTaskCount int
cacheMap = cache.NewConcurrentMap()
schedulerMap = cache.NewConcurrentMap()
)

type cacheData struct {
isInitializing bool
dataId string
Expand All @@ -68,21 +65,21 @@ type cacheData struct {
md5 string
appName string
taskId int
configClient *ConfigClient
}

type cacheDataListener struct {
listener vo.Listener
lastMd5 string
}

func init() {
schedulerMap.Set("root", true)
go delayScheduler(time.NewTimer(1*time.Millisecond), 10*time.Millisecond, "root", listenConfigExecutor())
}
func NewConfigClient(nc nacos_client.INacosClient) (*ConfigClient, error) {
config := &ConfigClient{
cacheMap: cache.NewConcurrentMap(),
schedulerMap: cache.NewConcurrentMap(),
}
config.schedulerMap.Set("root", true)
go config.delayScheduler(time.NewTimer(1*time.Millisecond), 10*time.Millisecond, "root", config.listenConfigExecutor())

func NewConfigClient(nc nacos_client.INacosClient) (ConfigClient, error) {
config := ConfigClient{}
config.INacosClient = nc
clientConfig, err := nc.GetClientConfig()
if err != nil {
Expand Down Expand Up @@ -257,27 +254,27 @@ func (client *ConfigClient) CancelListenConfig(param vo.ConfigParam) (err error)
logger.Errorf("[checkConfigInfo.GetClientConfig] failed,err:%+v", err)
return
}
cacheMap.Remove(util.GetConfigCacheKey(param.DataId, param.Group, clientConfig.NamespaceId))
client.cacheMap.Remove(util.GetConfigCacheKey(param.DataId, param.Group, clientConfig.NamespaceId))
logger.Infof("Cancel listen config DataId:%s Group:%s", param.DataId, param.Group)
remakeId := int(math.Ceil(float64(cacheMap.Count()) / float64(perTaskConfigSize)))
if remakeId < currentTaskCount {
remakeCacheDataTaskId(remakeId)
remakeId := int(math.Ceil(float64(client.cacheMap.Count()) / float64(perTaskConfigSize)))
if remakeId < client.currentTaskCount {
client.remakeCacheDataTaskId(remakeId)
}
return err
}

//Remake cache data taskId
func remakeCacheDataTaskId(remakeId int) {
func (client *ConfigClient) remakeCacheDataTaskId(remakeId int) {
for i := 0; i < remakeId; i++ {
count := 0
for _, key := range cacheMap.Keys() {
for _, key := range client.cacheMap.Keys() {
if count == perTaskConfigSize {
break
}
if value, ok := cacheMap.Get(key); ok {
if value, ok := client.cacheMap.Get(key); ok {
cData := value.(cacheData)
cData.taskId = i
cacheMap.Set(key, cData)
client.cacheMap.Set(key, cData)
}
count++
}
Expand All @@ -301,7 +298,7 @@ func (client *ConfigClient) ListenConfig(param vo.ConfigParam) (err error) {

key := util.GetConfigCacheKey(param.DataId, param.Group, clientConfig.NamespaceId)
var cData cacheData
if v, ok := cacheMap.Get(key); ok {
if v, ok := client.cacheMap.Get(key); ok {
cData = v.(cacheData)
cData.isInitializing = true
} else {
Expand Down Expand Up @@ -329,20 +326,19 @@ func (client *ConfigClient) ListenConfig(param vo.ConfigParam) (err error) {
content: content,
md5: md5Str,
cacheDataListener: listener,
taskId: cacheMap.Count() / perTaskConfigSize,
configClient: client,
taskId: client.cacheMap.Count() / perTaskConfigSize,
}
}
cacheMap.Set(key, cData)
client.cacheMap.Set(key, cData)
return
}

//Delay Scheduler
//initialDelay the time to delay first execution
//delay the delay between the termination of one execution and the commencement of the next
func delayScheduler(t *time.Timer, delay time.Duration, taskId string, execute func() error) {
func (client *ConfigClient) delayScheduler(t *time.Timer, delay time.Duration, taskId string, execute func() error) {
for {
if v, ok := schedulerMap.Get(taskId); ok {
if v, ok := client.schedulerMap.Get(taskId); ok {
if !v.(bool) {
return
}
Expand All @@ -357,39 +353,37 @@ func delayScheduler(t *time.Timer, delay time.Duration, taskId string, execute f
}

//Listen for the configuration executor
func listenConfigExecutor() func() error {
func (client *ConfigClient) listenConfigExecutor() func() error {
return func() error {
listenerSize := cacheMap.Count()
listenerSize := client.cacheMap.Count()
taskCount := int(math.Ceil(float64(listenerSize) / float64(perTaskConfigSize)))

if taskCount > currentTaskCount {
for i := currentTaskCount; i < taskCount; i++ {
schedulerMap.Set(strconv.Itoa(i), true)
go delayScheduler(time.NewTimer(1*time.Millisecond), 10*time.Millisecond, strconv.Itoa(i), longPulling(i))
if taskCount > client.currentTaskCount {
for i := client.currentTaskCount; i < taskCount; i++ {
client.schedulerMap.Set(strconv.Itoa(i), true)
go client.delayScheduler(time.NewTimer(1*time.Millisecond), 10*time.Millisecond, strconv.Itoa(i), client.longPulling(i))
}
currentTaskCount = taskCount
} else if taskCount < currentTaskCount {
for i := taskCount; i < currentTaskCount; i++ {
if _, ok := schedulerMap.Get(strconv.Itoa(i)); ok {
schedulerMap.Set(strconv.Itoa(i), false)
client.currentTaskCount = taskCount
} else if taskCount < client.currentTaskCount {
for i := taskCount; i < client.currentTaskCount; i++ {
if _, ok := client.schedulerMap.Get(strconv.Itoa(i)); ok {
client.schedulerMap.Set(strconv.Itoa(i), false)
}
}
currentTaskCount = taskCount
client.currentTaskCount = taskCount
}
return nil
}
}

//Long polling listening configuration
func longPulling(taskId int) func() error {
func (client *ConfigClient) longPulling(taskId int) func() error {
return func() error {
var listeningConfigs string
var client *ConfigClient
initializationList := make([]cacheData, 0)
for _, key := range cacheMap.Keys() {
if value, ok := cacheMap.Get(key); ok {
for _, key := range client.cacheMap.Keys() {
if value, ok := client.cacheMap.Get(key); ok {
cData := value.(cacheData)
client = cData.configClient
if cData.taskId == taskId {
if cData.isInitializing {
initializationList = append(initializationList, cData)
Expand Down Expand Up @@ -428,7 +422,7 @@ func longPulling(taskId int) func() error {
}
for _, v := range initializationList {
v.isInitializing = false
cacheMap.Set(util.GetConfigCacheKey(v.dataId, v.group, v.tenant), v)
client.cacheMap.Set(util.GetConfigCacheKey(v.dataId, v.group, v.tenant), v)
}
if len(strings.ToLower(strings.Trim(changed, " "))) == 0 {
logger.Info("[client.ListenConfig] no change")
Expand All @@ -448,7 +442,7 @@ func (client *ConfigClient) callListener(changed, tenant string) {
for _, config := range changedConfigs {
attrs := strings.Split(config, "%02")
if len(attrs) >= 2 {
if value, ok := cacheMap.Get(util.GetConfigCacheKey(attrs[0], attrs[1], tenant)); ok {
if value, ok := client.cacheMap.Get(util.GetConfigCacheKey(attrs[0], attrs[1], tenant)); ok {
cData := value.(cacheData)
content, err := client.getConfigInner(vo.ConfigParam{
DataId: cData.dataId,
Expand All @@ -463,7 +457,7 @@ func (client *ConfigClient) callListener(changed, tenant string) {
if cData.md5 != cData.cacheDataListener.lastMd5 {
go cData.cacheDataListener.listener(tenant, attrs[1], attrs[0], cData.content)
cData.cacheDataListener.lastMd5 = cData.md5
cacheMap.Set(util.GetConfigCacheKey(cData.dataId, cData.group, tenant), cData)
client.cacheMap.Set(util.GetConfigCacheKey(cData.dataId, cData.group, tenant), cData)
}
}
}
Expand Down
15 changes: 3 additions & 12 deletions clients/config_client/config_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ var serverConfigsTest = []constant.ServerConfig{serverConfigTest}

var httpAgentTest = mock.MockIHttpAgent{}

func createConfigClientTest() ConfigClient {
func createConfigClientTest() *ConfigClient {
nc := nacos_client.NacosClient{}
nc.SetServerConfig([]constant.ServerConfig{*serverConfigWithOptions})
nc.SetClientConfig(*clientConfigWithOptions)
Expand All @@ -119,16 +119,7 @@ func createConfigClientTest() ConfigClient {
return client
}

func createConfigClientTestWithTenant() ConfigClient {
nc := nacos_client.NacosClient{}
nc.SetServerConfig([]constant.ServerConfig{serverConfigTest})
nc.SetClientConfig(clientConfigTestWithTenant)
nc.SetHttpAgent(&http_agent.HttpAgent{})
client, _ := NewConfigClient(&nc)
return client
}

func createConfigClientHttpTest(mockHttpAgent http_agent.IHttpAgent) ConfigClient {
func createConfigClientHttpTest(mockHttpAgent http_agent.IHttpAgent) *ConfigClient {
nc := nacos_client.NacosClient{}
nc.SetServerConfig([]constant.ServerConfig{serverConfigTest})
nc.SetClientConfig(clientConfigTest)
Expand All @@ -137,7 +128,7 @@ func createConfigClientHttpTest(mockHttpAgent http_agent.IHttpAgent) ConfigClien
return client
}

func createConfigClientHttpTestWithTenant(mockHttpAgent http_agent.IHttpAgent) ConfigClient {
func createConfigClientHttpTestWithTenant(mockHttpAgent http_agent.IHttpAgent) *ConfigClient {
nc := nacos_client.NacosClient{}
nc.SetServerConfig([]constant.ServerConfig{serverConfigTest})
nc.SetClientConfig(clientConfigTestWithTenant)
Expand Down

0 comments on commit 56cb33f

Please sign in to comment.