Skip to content

Commit

Permalink
修复初始化并发读写的情况 (#591)
Browse files Browse the repository at this point in the history
* 修复初始化并发写的情况
  • Loading branch information
tonglin96 authored Mar 23, 2023
1 parent 43cf5ae commit e25bc91
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 13 deletions.
21 changes: 11 additions & 10 deletions common/remote/rpc/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,14 @@ type GrpcClient struct {
func NewGrpcClient(ctx context.Context, clientName string, nacosServer *nacos_server.NacosServer) *GrpcClient {
rpcClient := &GrpcClient{
&RpcClient{
ctx: ctx,
name: clientName,
labels: make(map[string]string, 8),
rpcClientStatus: INITIALIZED,
eventChan: make(chan ConnectionEvent),
reconnectionChan: make(chan ReconnectContext),
nacosServer: nacosServer,
serverRequestHandlerMapping: make(map[string]ServerRequestHandlerMapping, 8),
mux: new(sync.Mutex),
ctx: ctx,
name: clientName,
labels: make(map[string]string, 8),
rpcClientStatus: INITIALIZED,
eventChan: make(chan ConnectionEvent),
reconnectionChan: make(chan ReconnectContext),
nacosServer: nacosServer,
mux: new(sync.Mutex),
},
}
rpcClient.RpcClient.lastActiveTimestamp.Store(time.Now())
Expand Down Expand Up @@ -240,12 +239,14 @@ func (c *GrpcClient) handleServerRequest(p *nacos_grpc_service.Payload, grpcConn
client := c.GetRpcClient()
payLoadType := p.GetMetadata().GetType()

mapping, ok := client.serverRequestHandlerMapping[payLoadType]
handlerMapping, ok := client.serverRequestHandlerMapping.Load(payLoadType)
if !ok {
logger.Errorf("%s Unsupported payload type", grpcConn.getConnectionId())
return
}

mapping := handlerMapping.(ServerRequestHandlerMapping)

serverRequest := mapping.serverRequest()
err := json.Unmarshal(p.GetBody().Value, serverRequest)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions common/remote/rpc/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type RpcClient struct {
lastActiveTimestamp atomic.Value
executeClient IRpcClient
nacosServer *nacos_server.NacosServer
serverRequestHandlerMapping map[string]ServerRequestHandlerMapping
serverRequestHandlerMapping sync.Map
mux *sync.Mutex
clientAbilities rpc_request.ClientAbilities
Tenant string
Expand Down Expand Up @@ -290,10 +290,10 @@ func (r *RpcClient) RegisterServerRequestHandler(request func() rpc_request.IReq
return
}
logger.Debugf("%s register server push request:%s handler:%+v", r.name, requestType, handler.Name())
r.serverRequestHandlerMapping[requestType] = ServerRequestHandlerMapping{
r.serverRequestHandlerMapping.Store(requestType, ServerRequestHandlerMapping{
serverRequest: request,
handler: handler,
}
})
}

func (r *RpcClient) RegisterConnectionListener(listener IConnectionEventListener) {
Expand Down

0 comments on commit e25bc91

Please sign in to comment.