Skip to content

Commit

Permalink
fixed memory usage problem caused by maxInt chan (#596)
Browse files Browse the repository at this point in the history
  • Loading branch information
binbin0325 authored Mar 22, 2023
1 parent 03bd50f commit 859cdce
Show file tree
Hide file tree
Showing 10 changed files with 33 additions and 31 deletions.
7 changes: 3 additions & 4 deletions clients/cache/disk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package cache

import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"strconv"
Expand Down Expand Up @@ -100,7 +99,7 @@ func ReadConfigFromFile(cacheKey string, cacheDir string) (string, error) {
b, err := ioutil.ReadFile(fileName)
if err != nil {
logger.Errorf("get config from cache, cacheKey:%s, cacheDir:%s, error:%v ", cacheKey, cacheDir, err)
return "", errors.New(fmt.Sprintf("failed to read config cache file:%s, cacheDir=%s, err:%v ", fileName, cacheDir, err))
return "", errors.Errorf("failed to read config cache file:%s, cacheDir:%s, err:%v ", fileName, cacheDir, err)
}
return string(b), nil
}
Expand All @@ -111,10 +110,10 @@ func GetFailover(key, dir string) string {
if !file.IsExistFile(filePath) {
return ""
}
logger.GetLogger().Warn(fmt.Sprintf("reading failover content from path:%s", filePath))
logger.Warnf("reading failover content from path:%s", filePath)
fileContent, err := ioutil.ReadFile(filePath)
if err != nil {
logger.GetLogger().Error(fmt.Sprintf("fail to read failover content from %s", filePath))
logger.Errorf("fail to read failover content from %s", filePath)
return ""
}
return string(fileContent)
Expand Down
19 changes: 8 additions & 11 deletions clients/config_client/config_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package config_client
import (
"context"
"fmt"
"math"
"os"
"strings"
"sync"
Expand Down Expand Up @@ -134,15 +133,11 @@ func NewConfigClient(nc nacos_client.INacosClient) (*ConfigClient, error) {
if err != nil {
return nil, err
}
config.uid = uid.String()

config.uid = uid.String()
config.cacheMap = cache.NewConcurrentMap()
// maximum buffered queue to prevent chan deadlocks during frequent configuration file updates
// use Math.MaxInt32 to avoid overflows on ARM 32-bits
config.listenExecute = make(chan struct{}, math.MaxInt32)

config.listenExecute = make(chan struct{})
config.startInternal()

return config, err
}

Expand Down Expand Up @@ -204,7 +199,7 @@ func (client *ConfigClient) getConfigInner(param vo.ConfigParam) (content string
cacheKey := util.GetConfigCacheKey(param.DataId, param.Group, clientConfig.NamespaceId)
content = cache.GetFailover(cacheKey, client.configCacheDir)
if len(content) > 0 {
logger.GetLogger().Warn(fmt.Sprintf("%s %s %s is using failover content!", clientConfig.NamespaceId, param.Group, param.DataId))
logger.Warnf("%s %s %s is using failover content!", clientConfig.NamespaceId, param.Group, param.DataId)
return content, nil
}
response, err := client.configProxy.queryConfig(param.DataId, param.Group, clientConfig.NamespaceId,
Expand Down Expand Up @@ -464,7 +459,7 @@ func (client *ConfigClient) executeConfigListen() {
}

if hasChangedKeys {
client.notifyListenConfig()
client.asyncNotifyListenConfig()
}
monitor.GetListenConfigCountMonitor().Set(float64(client.cacheMap.Count()))
}
Expand Down Expand Up @@ -501,6 +496,8 @@ func (client *ConfigClient) refreshContentAndCheck(cacheData *cacheData, notify
}
}

func (client *ConfigClient) notifyListenConfig() {
client.listenExecute <- struct{}{}
func (client *ConfigClient) asyncNotifyListenConfig() {
go func() {
client.listenExecute <- struct{}{}
}()
}
2 changes: 1 addition & 1 deletion clients/config_client/config_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (c *ConfigChangeNotifyRequestHandler) RequestReply(request rpc_request.IReq
}
cData := data.(*cacheData)
cData.isSyncWithServer = false
c.client.notifyListenConfig()
c.client.asyncNotifyListenConfig()
return &rpc_response.NotifySubscriberResponse{
Response: &rpc_response.Response{ResultCode: constant.RESPONSE_CODE_SUCCESS},
}
Expand Down
3 changes: 1 addition & 2 deletions clients/naming_client/naming_cache/service_info_holder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package naming_cache

import (
"fmt"
"os"
"reflect"
"sort"
Expand Down Expand Up @@ -152,7 +151,7 @@ func isServiceInstanceChanged(oldService, newService model.Service) bool {
oldRefTime := oldService.LastRefTime
newRefTime := newService.LastRefTime
if oldRefTime > newRefTime {
logger.Warn(fmt.Sprintf("out of date data received, old-t: %v , new-t: %v", oldRefTime, newRefTime))
logger.Warnf("out of date data received, old-t: %v , new-t: %v", oldRefTime, newRefTime)
return false
}
// sort instance list
Expand Down
5 changes: 5 additions & 0 deletions clients/naming_client/naming_grpc/naming_grpc_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ func (proxy *NamingGrpcProxy) IsSubscribed(serviceName, groupName string, cluste

// Subscribe ...
func (proxy *NamingGrpcProxy) Subscribe(serviceName, groupName string, clusters string) (model.Service, error) {
logger.Infof("Subscribe Service namespaceId:<%s>, serviceName:<%s>, groupName:<%s>, clusters:<%s>",
proxy.clientConfig.NamespaceId, serviceName, groupName, clusters)
proxy.eventListener.CacheSubscriberForRedo(util.GetGroupName(serviceName, groupName), clusters)
request := rpc_request.NewSubscribeServiceRequest(proxy.clientConfig.NamespaceId, serviceName,
groupName, clusters, true)
Expand All @@ -188,12 +190,15 @@ func (proxy *NamingGrpcProxy) Subscribe(serviceName, groupName string, clusters

// Unsubscribe ...
func (proxy *NamingGrpcProxy) Unsubscribe(serviceName, groupName, clusters string) error {
logger.Infof("Unsubscribe Service namespaceId:<%s>, serviceName:<%s>, groupName:<%s>, clusters:<%s>",
proxy.clientConfig.NamespaceId, serviceName, groupName, clusters)
proxy.eventListener.RemoveSubscriberForRedo(util.GetGroupName(serviceName, groupName), clusters)
_, err := proxy.requestToServer(rpc_request.NewSubscribeServiceRequest(proxy.clientConfig.NamespaceId, serviceName, groupName,
clusters, false))
return err
}

func (proxy *NamingGrpcProxy) CloseClient() {
logger.Info("Close Nacos Go SDK Client...")
proxy.rpcClient.GetRpcClient().Shutdown()
}
5 changes: 2 additions & 3 deletions clients/naming_client/naming_http/naming_http_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package naming_http

import (
"context"
"fmt"
"net/http"
"strconv"
"time"
Expand Down Expand Up @@ -152,14 +151,14 @@ func (proxy *NamingHttpProxy) GetServiceList(pageNo uint32, pageSize uint32, gro

count, err := jsonparser.GetInt([]byte(result), "count")
if err != nil {
return serviceList, errors.New(fmt.Sprintf("namespaceId:<%s> get service list pageNo:<%d> pageSize:<%d> selector:<%s> from <%s> get 'count' from <%s> error:<%+v>", namespaceId, pageNo, pageSize, util.ToJsonString(selector), groupName, result, err))
return serviceList, errors.Errorf("namespaceId:<%s> get service list pageNo:<%d> pageSize:<%d> selector:<%s> from <%s> get 'count' from <%s> error:<%+v>", namespaceId, pageNo, pageSize, util.ToJsonString(selector), groupName, result, err)
}
var doms []string
_, err = jsonparser.ArrayEach([]byte(result), func(value []byte, dataType jsonparser.ValueType, offset int, err error) {
doms = append(doms, string(value))
}, "doms")
if err != nil {
return serviceList, errors.New(fmt.Sprintf("namespaceId:<%s> get service list pageNo:<%d> pageSize:<%d> selector:<%s> from <%s> get 'doms' from <%s> error:<%+v> ", namespaceId, pageNo, pageSize, util.ToJsonString(selector), groupName, result, err))
return serviceList, errors.Errorf("namespaceId:<%s> get service list pageNo:<%d> pageSize:<%d> selector:<%s> from <%s> get 'doms' from <%s> error:<%+v> ", namespaceId, pageNo, pageSize, util.ToJsonString(selector), groupName, result, err)
}
serviceList.Count = count
serviceList.Doms = doms
Expand Down
3 changes: 1 addition & 2 deletions common/nacos_server/nacos_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"crypto/hmac"
"crypto/sha1"
"encoding/base64"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
Expand Down Expand Up @@ -187,7 +186,7 @@ func (server *NacosServer) callServer(api string, params map[string]string, meth
if response.StatusCode == constant.RESPONSE_CODE_SUCCESS {
return
} else {
err = errors.New(fmt.Sprintf("request return error code %d", response.StatusCode))
err = errors.Errorf("request return error code %d", response.StatusCode)
return
}
}
Expand Down
3 changes: 1 addition & 2 deletions common/remote/rpc/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"encoding/json"
"io"
"math"
"os"
"strconv"
"sync"
Expand Down Expand Up @@ -51,7 +50,7 @@ func NewGrpcClient(ctx context.Context, clientName string, nacosServer *nacos_se
name: clientName,
labels: make(map[string]string, 8),
rpcClientStatus: INITIALIZED,
eventChan: make(chan ConnectionEvent, math.MaxInt32),
eventChan: make(chan ConnectionEvent),
reconnectionChan: make(chan ReconnectContext),
nacosServer: nacosServer,
serverRequestHandlerMapping: make(map[string]ServerRequestHandlerMapping, 8),
Expand Down
5 changes: 2 additions & 3 deletions common/remote/rpc/grpc_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package rpc
import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/golang/protobuf/ptypes/any"
Expand Down Expand Up @@ -63,8 +62,8 @@ func (g *GrpcConnection) request(request rpc_request.IRequest, timeoutMills int6
responseFunc, ok := rpc_response.ClientResponseMapping[responsePayload.Metadata.GetType()]

if !ok {
return nil, errors.New(fmt.Sprintf("request:%s,unsupported response type:%s", request.GetRequestType(),
responsePayload.Metadata.GetType()))
return nil, errors.Errorf("request:%s,unsupported response type:%s", request.GetRequestType(),
responsePayload.Metadata.GetType())
}
response := responseFunc()
err = json.Unmarshal(responsePayload.GetBody().Value, response)
Expand Down
12 changes: 9 additions & 3 deletions common/remote/rpc/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,18 @@ func (r *RpcClient) Start() {
currentConnection.getServerInfo(), currentConnection.getConnectionId())
r.currentConnection = currentConnection
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING))
r.eventChan <- ConnectionEvent{eventType: CONNECTED}
r.asyncNotifyConnectionChange(CONNECTED)
} else {
r.switchServerAsync(ServerInfo{}, false)
}
}

func (r *RpcClient) asyncNotifyConnectionChange(eventType ConnectionStatus) {
go func() {
r.eventChan <- ConnectionEvent{eventType: eventType}
}()
}

func (r *RpcClient) notifyServerSrvChange() {
if r.currentConnection == nil {
r.switchServerAsync(ServerInfo{}, false)
Expand Down Expand Up @@ -339,7 +345,7 @@ func (r *RpcClient) reconnect(serverInfo ServerInfo, onRequestFail bool) {
}
r.currentConnection = connectionNew
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING))
r.eventChan <- ConnectionEvent{eventType: CONNECTED}
r.asyncNotifyConnectionChange(CONNECTED)
return
}
if r.isShutdown() {
Expand All @@ -365,7 +371,7 @@ func (r *RpcClient) reconnect(serverInfo ServerInfo, onRequestFail bool) {
func (r *RpcClient) closeConnection() {
if r.currentConnection != nil {
r.currentConnection.close()
r.eventChan <- ConnectionEvent{eventType: DISCONNECTED}
r.asyncNotifyConnectionChange(DISCONNECTED)
}
}

Expand Down

0 comments on commit 859cdce

Please sign in to comment.