diff --git a/common/remote/rpc/grpc_client.go b/common/remote/rpc/grpc_client.go index 3d38d130..a4a72459 100644 --- a/common/remote/rpc/grpc_client.go +++ b/common/remote/rpc/grpc_client.go @@ -46,15 +46,14 @@ type GrpcClient struct { func NewGrpcClient(ctx context.Context, clientName string, nacosServer *nacos_server.NacosServer) *GrpcClient { rpcClient := &GrpcClient{ &RpcClient{ - ctx: ctx, - name: clientName, - labels: make(map[string]string, 8), - rpcClientStatus: INITIALIZED, - eventChan: make(chan ConnectionEvent), - reconnectionChan: make(chan ReconnectContext), - nacosServer: nacosServer, - serverRequestHandlerMapping: make(map[string]ServerRequestHandlerMapping, 8), - mux: new(sync.Mutex), + ctx: ctx, + name: clientName, + labels: make(map[string]string, 8), + rpcClientStatus: INITIALIZED, + eventChan: make(chan ConnectionEvent), + reconnectionChan: make(chan ReconnectContext), + nacosServer: nacosServer, + mux: new(sync.Mutex), }, } rpcClient.RpcClient.lastActiveTimestamp.Store(time.Now()) @@ -240,12 +239,14 @@ func (c *GrpcClient) handleServerRequest(p *nacos_grpc_service.Payload, grpcConn client := c.GetRpcClient() payLoadType := p.GetMetadata().GetType() - mapping, ok := client.serverRequestHandlerMapping[payLoadType] + handlerMapping, ok := client.serverRequestHandlerMapping.Load(payLoadType) if !ok { logger.Errorf("%s Unsupported payload type", grpcConn.getConnectionId()) return } + mapping := handlerMapping.(ServerRequestHandlerMapping) + serverRequest := mapping.serverRequest() err := json.Unmarshal(p.GetBody().Value, serverRequest) if err != nil { diff --git a/common/remote/rpc/rpc_client.go b/common/remote/rpc/rpc_client.go index 880ce2d7..40496ca3 100644 --- a/common/remote/rpc/rpc_client.go +++ b/common/remote/rpc/rpc_client.go @@ -105,7 +105,7 @@ type RpcClient struct { lastActiveTimestamp atomic.Value executeClient IRpcClient nacosServer *nacos_server.NacosServer - serverRequestHandlerMapping map[string]ServerRequestHandlerMapping + serverRequestHandlerMapping sync.Map mux *sync.Mutex clientAbilities rpc_request.ClientAbilities Tenant string @@ -290,10 +290,10 @@ func (r *RpcClient) RegisterServerRequestHandler(request func() rpc_request.IReq return } logger.Debugf("%s register server push request:%s handler:%+v", r.name, requestType, handler.Name()) - r.serverRequestHandlerMapping[requestType] = ServerRequestHandlerMapping{ + r.serverRequestHandlerMapping.Store(requestType, ServerRequestHandlerMapping{ serverRequest: request, handler: handler, - } + }) } func (r *RpcClient) RegisterConnectionListener(listener IConnectionEventListener) {