From 56cb33f77aaeaa4ce2fbab54f461785feaaecb28 Mon Sep 17 00:00:00 2001 From: "binbin.zhang" Date: Thu, 27 May 2021 10:43:18 +0800 Subject: [PATCH] remove init to start the scheduling task (#251) --- clients/client_factory.go | 2 +- clients/config_client/config_client.go | 98 ++++++++++----------- clients/config_client/config_client_test.go | 15 +--- 3 files changed, 50 insertions(+), 65 deletions(-) diff --git a/clients/client_factory.go b/clients/client_factory.go index cc47ee6a..ad14d245 100644 --- a/clients/client_factory.go +++ b/clients/client_factory.go @@ -48,7 +48,7 @@ func NewConfigClient(param vo.NacosClientParam) (iClient config_client.IConfigCl if err != nil { return } - iClient = &config + iClient = config return } diff --git a/clients/config_client/config_client.go b/clients/config_client/config_client.go index c2aa4523..2c05e156 100644 --- a/clients/config_client/config_client.go +++ b/clients/config_client/config_client.go @@ -40,11 +40,14 @@ import ( type ConfigClient struct { nacos_client.INacosClient - kmsClient *kms.Client - localConfigs []vo.ConfigParam - mutex sync.Mutex - configProxy ConfigProxy - configCacheDir string + kmsClient *kms.Client + localConfigs []vo.ConfigParam + mutex sync.Mutex + configProxy ConfigProxy + configCacheDir string + currentTaskCount int + cacheMap cache.ConcurrentMap + schedulerMap cache.ConcurrentMap } const ( @@ -52,12 +55,6 @@ const ( executorErrDelay = 5 * time.Second ) -var ( - currentTaskCount int - cacheMap = cache.NewConcurrentMap() - schedulerMap = cache.NewConcurrentMap() -) - type cacheData struct { isInitializing bool dataId string @@ -68,7 +65,6 @@ type cacheData struct { md5 string appName string taskId int - configClient *ConfigClient } type cacheDataListener struct { @@ -76,13 +72,14 @@ type cacheDataListener struct { lastMd5 string } -func init() { - schedulerMap.Set("root", true) - go delayScheduler(time.NewTimer(1*time.Millisecond), 10*time.Millisecond, "root", listenConfigExecutor()) -} +func NewConfigClient(nc nacos_client.INacosClient) (*ConfigClient, error) { + config := &ConfigClient{ + cacheMap: cache.NewConcurrentMap(), + schedulerMap: cache.NewConcurrentMap(), + } + config.schedulerMap.Set("root", true) + go config.delayScheduler(time.NewTimer(1*time.Millisecond), 10*time.Millisecond, "root", config.listenConfigExecutor()) -func NewConfigClient(nc nacos_client.INacosClient) (ConfigClient, error) { - config := ConfigClient{} config.INacosClient = nc clientConfig, err := nc.GetClientConfig() if err != nil { @@ -257,27 +254,27 @@ func (client *ConfigClient) CancelListenConfig(param vo.ConfigParam) (err error) logger.Errorf("[checkConfigInfo.GetClientConfig] failed,err:%+v", err) return } - cacheMap.Remove(util.GetConfigCacheKey(param.DataId, param.Group, clientConfig.NamespaceId)) + client.cacheMap.Remove(util.GetConfigCacheKey(param.DataId, param.Group, clientConfig.NamespaceId)) logger.Infof("Cancel listen config DataId:%s Group:%s", param.DataId, param.Group) - remakeId := int(math.Ceil(float64(cacheMap.Count()) / float64(perTaskConfigSize))) - if remakeId < currentTaskCount { - remakeCacheDataTaskId(remakeId) + remakeId := int(math.Ceil(float64(client.cacheMap.Count()) / float64(perTaskConfigSize))) + if remakeId < client.currentTaskCount { + client.remakeCacheDataTaskId(remakeId) } return err } //Remake cache data taskId -func remakeCacheDataTaskId(remakeId int) { +func (client *ConfigClient) remakeCacheDataTaskId(remakeId int) { for i := 0; i < remakeId; i++ { count := 0 - for _, key := range cacheMap.Keys() { + for _, key := range client.cacheMap.Keys() { if count == perTaskConfigSize { break } - if value, ok := cacheMap.Get(key); ok { + if value, ok := client.cacheMap.Get(key); ok { cData := value.(cacheData) cData.taskId = i - cacheMap.Set(key, cData) + client.cacheMap.Set(key, cData) } count++ } @@ -301,7 +298,7 @@ func (client *ConfigClient) ListenConfig(param vo.ConfigParam) (err error) { key := util.GetConfigCacheKey(param.DataId, param.Group, clientConfig.NamespaceId) var cData cacheData - if v, ok := cacheMap.Get(key); ok { + if v, ok := client.cacheMap.Get(key); ok { cData = v.(cacheData) cData.isInitializing = true } else { @@ -329,20 +326,19 @@ func (client *ConfigClient) ListenConfig(param vo.ConfigParam) (err error) { content: content, md5: md5Str, cacheDataListener: listener, - taskId: cacheMap.Count() / perTaskConfigSize, - configClient: client, + taskId: client.cacheMap.Count() / perTaskConfigSize, } } - cacheMap.Set(key, cData) + client.cacheMap.Set(key, cData) return } //Delay Scheduler //initialDelay the time to delay first execution //delay the delay between the termination of one execution and the commencement of the next -func delayScheduler(t *time.Timer, delay time.Duration, taskId string, execute func() error) { +func (client *ConfigClient) delayScheduler(t *time.Timer, delay time.Duration, taskId string, execute func() error) { for { - if v, ok := schedulerMap.Get(taskId); ok { + if v, ok := client.schedulerMap.Get(taskId); ok { if !v.(bool) { return } @@ -357,39 +353,37 @@ func delayScheduler(t *time.Timer, delay time.Duration, taskId string, execute f } //Listen for the configuration executor -func listenConfigExecutor() func() error { +func (client *ConfigClient) listenConfigExecutor() func() error { return func() error { - listenerSize := cacheMap.Count() + listenerSize := client.cacheMap.Count() taskCount := int(math.Ceil(float64(listenerSize) / float64(perTaskConfigSize))) - if taskCount > currentTaskCount { - for i := currentTaskCount; i < taskCount; i++ { - schedulerMap.Set(strconv.Itoa(i), true) - go delayScheduler(time.NewTimer(1*time.Millisecond), 10*time.Millisecond, strconv.Itoa(i), longPulling(i)) + if taskCount > client.currentTaskCount { + for i := client.currentTaskCount; i < taskCount; i++ { + client.schedulerMap.Set(strconv.Itoa(i), true) + go client.delayScheduler(time.NewTimer(1*time.Millisecond), 10*time.Millisecond, strconv.Itoa(i), client.longPulling(i)) } - currentTaskCount = taskCount - } else if taskCount < currentTaskCount { - for i := taskCount; i < currentTaskCount; i++ { - if _, ok := schedulerMap.Get(strconv.Itoa(i)); ok { - schedulerMap.Set(strconv.Itoa(i), false) + client.currentTaskCount = taskCount + } else if taskCount < client.currentTaskCount { + for i := taskCount; i < client.currentTaskCount; i++ { + if _, ok := client.schedulerMap.Get(strconv.Itoa(i)); ok { + client.schedulerMap.Set(strconv.Itoa(i), false) } } - currentTaskCount = taskCount + client.currentTaskCount = taskCount } return nil } } //Long polling listening configuration -func longPulling(taskId int) func() error { +func (client *ConfigClient) longPulling(taskId int) func() error { return func() error { var listeningConfigs string - var client *ConfigClient initializationList := make([]cacheData, 0) - for _, key := range cacheMap.Keys() { - if value, ok := cacheMap.Get(key); ok { + for _, key := range client.cacheMap.Keys() { + if value, ok := client.cacheMap.Get(key); ok { cData := value.(cacheData) - client = cData.configClient if cData.taskId == taskId { if cData.isInitializing { initializationList = append(initializationList, cData) @@ -428,7 +422,7 @@ func longPulling(taskId int) func() error { } for _, v := range initializationList { v.isInitializing = false - cacheMap.Set(util.GetConfigCacheKey(v.dataId, v.group, v.tenant), v) + client.cacheMap.Set(util.GetConfigCacheKey(v.dataId, v.group, v.tenant), v) } if len(strings.ToLower(strings.Trim(changed, " "))) == 0 { logger.Info("[client.ListenConfig] no change") @@ -448,7 +442,7 @@ func (client *ConfigClient) callListener(changed, tenant string) { for _, config := range changedConfigs { attrs := strings.Split(config, "%02") if len(attrs) >= 2 { - if value, ok := cacheMap.Get(util.GetConfigCacheKey(attrs[0], attrs[1], tenant)); ok { + if value, ok := client.cacheMap.Get(util.GetConfigCacheKey(attrs[0], attrs[1], tenant)); ok { cData := value.(cacheData) content, err := client.getConfigInner(vo.ConfigParam{ DataId: cData.dataId, @@ -463,7 +457,7 @@ func (client *ConfigClient) callListener(changed, tenant string) { if cData.md5 != cData.cacheDataListener.lastMd5 { go cData.cacheDataListener.listener(tenant, attrs[1], attrs[0], cData.content) cData.cacheDataListener.lastMd5 = cData.md5 - cacheMap.Set(util.GetConfigCacheKey(cData.dataId, cData.group, tenant), cData) + client.cacheMap.Set(util.GetConfigCacheKey(cData.dataId, cData.group, tenant), cData) } } } diff --git a/clients/config_client/config_client_test.go b/clients/config_client/config_client_test.go index ae6c33be..fb731fe6 100644 --- a/clients/config_client/config_client_test.go +++ b/clients/config_client/config_client_test.go @@ -110,7 +110,7 @@ var serverConfigsTest = []constant.ServerConfig{serverConfigTest} var httpAgentTest = mock.MockIHttpAgent{} -func createConfigClientTest() ConfigClient { +func createConfigClientTest() *ConfigClient { nc := nacos_client.NacosClient{} nc.SetServerConfig([]constant.ServerConfig{*serverConfigWithOptions}) nc.SetClientConfig(*clientConfigWithOptions) @@ -119,16 +119,7 @@ func createConfigClientTest() ConfigClient { return client } -func createConfigClientTestWithTenant() ConfigClient { - nc := nacos_client.NacosClient{} - nc.SetServerConfig([]constant.ServerConfig{serverConfigTest}) - nc.SetClientConfig(clientConfigTestWithTenant) - nc.SetHttpAgent(&http_agent.HttpAgent{}) - client, _ := NewConfigClient(&nc) - return client -} - -func createConfigClientHttpTest(mockHttpAgent http_agent.IHttpAgent) ConfigClient { +func createConfigClientHttpTest(mockHttpAgent http_agent.IHttpAgent) *ConfigClient { nc := nacos_client.NacosClient{} nc.SetServerConfig([]constant.ServerConfig{serverConfigTest}) nc.SetClientConfig(clientConfigTest) @@ -137,7 +128,7 @@ func createConfigClientHttpTest(mockHttpAgent http_agent.IHttpAgent) ConfigClien return client } -func createConfigClientHttpTestWithTenant(mockHttpAgent http_agent.IHttpAgent) ConfigClient { +func createConfigClientHttpTestWithTenant(mockHttpAgent http_agent.IHttpAgent) *ConfigClient { nc := nacos_client.NacosClient{} nc.SetServerConfig([]constant.ServerConfig{serverConfigTest}) nc.SetClientConfig(clientConfigTestWithTenant)