Skip to content

Commit

Permalink
Fix/placement close stream (dapr#5592)
Browse files Browse the repository at this point in the history
* Refact placement client

Signed-off-by: Marcos Candeia <[email protected]>

* Add placement client and implement stream draining properly

Signed-off-by: Marcos Candeia <[email protected]>

Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia authored Dec 5, 2022
1 parent 31b83bf commit da5706c
Show file tree
Hide file tree
Showing 5 changed files with 476 additions and 204 deletions.
176 changes: 176 additions & 0 deletions pkg/actors/internal/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
Copyright 2022 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package internal

import (
"context"
"sync"

v1pb "github.com/dapr/dapr/pkg/proto/placement/v1"

"google.golang.org/grpc"
)

// placementClient implements the best practices when handling grpc streams
// using exclusion locks to prevent concurrent Recv and Send from channels.
// properly handle stream closing by draining the renamining events until receives an error.
// and broadcasts connection results based on new connections and disconnects.
type placementClient struct {
// getGrpcOpts are the options that should be used to connect to the placement service
getGrpcOpts func() ([]grpc.DialOption, error)

// recvLock prevents concurrent access to the Recv() method.
recvLock *sync.Mutex
// sendLock prevents CloseSend and Send being used concurrently.
sendLock *sync.Mutex

// clientStream is the client side stream.
clientStream v1pb.Placement_ReportDaprStatusClient
// clientConn is the gRPC client connection.
clientConn *grpc.ClientConn
// streamConnAlive is the status of stream connection alive.
streamConnAlive bool
// streamConnectedCond is the condition variable for goroutines waiting for or announcing
// that the stream between runtime and placement is connected.
streamConnectedCond *sync.Cond

// ctx is the stream context
ctx context.Context
// cancel is the cancel func for context
cancel context.CancelFunc
}

// connectToServer initializes a new connection to the target server and if it succeeds replace the current
// stream with the connected stream.
func (c *placementClient) connectToServer(serverAddr string) error {
opts, err := c.getGrpcOpts()
if err != nil {
return err
}

conn, err := grpc.Dial(serverAddr, opts...)
if err != nil {
if conn != nil {
conn.Close()
}
return err
}

ctx, cancel := context.WithCancel(context.Background())
client := v1pb.NewPlacementClient(conn)
stream, err := client.ReportDaprStatus(ctx)
if err != nil {
cancel()
return err
}

c.streamConnectedCond.L.Lock()
defer c.streamConnectedCond.L.Unlock()
c.ctx, c.cancel, c.clientStream, c.clientConn = ctx, cancel, stream, conn
c.streamConnAlive = true
c.streamConnectedCond.Broadcast()
return nil
}

// drain the grpc stream as described in the documentation
// https://github.com/grpc/grpc-go/blob/be1fb4f27549f736b9b4ec26104c7c6b29845ad0/stream.go#L109
func (c *placementClient) drain(stream grpc.ClientStream, conn *grpc.ClientConn, cancelFunc context.CancelFunc) {
c.sendLock.Lock()
stream.CloseSend() // CloseSend cannot be invoked concurrently with Send()
c.sendLock.Unlock()
conn.Close()
cancelFunc()

c.recvLock.Lock()
defer c.recvLock.Unlock()
for {
if err := stream.RecvMsg(struct{}{}); err != nil { // recv cannot be invoked concurrently with other Recv.
break
}
}
}

var noop = func() {}

// disconnect from the current server without any additional operation.
func (c *placementClient) disconnect() {
c.disconnectFn(noop)
}

// disonnectFn disconnects from the current server providing a way to run a function inside the lock in case of new disconnection occurs.
// the function will not be executed in case of the stream is already disconnected.
func (c *placementClient) disconnectFn(insideLockFn func()) {
c.streamConnectedCond.L.Lock()
if !c.streamConnAlive {
c.streamConnectedCond.Broadcast()
c.streamConnectedCond.L.Unlock()
return
}

c.drain(c.clientStream, c.clientConn, c.cancel)

c.streamConnAlive = false
c.streamConnectedCond.Broadcast()

insideLockFn()

c.streamConnectedCond.L.Unlock()
}

// isConnected returns if the current instance is connected to any placement server.
func (c *placementClient) isConnected() bool {
c.streamConnectedCond.L.Lock()
defer c.streamConnectedCond.L.Unlock()
return c.streamConnAlive
}

// waitUntil receives a predicate that given a current stream status returns a boolean that indicates if the stream reached the desired state.
func (c *placementClient) waitUntil(predicate func(streamConnAlive bool) bool) {
c.streamConnectedCond.L.Lock()
for !predicate(c.streamConnAlive) {
c.streamConnectedCond.Wait()
}
c.streamConnectedCond.L.Unlock()
}

// recv is a convenient way to call recv providing thread-safe guarantees with disconnections and draining operations.
func (c *placementClient) recv() (*v1pb.PlacementOrder, error) {
c.streamConnectedCond.L.Lock()
stream := c.clientStream
c.streamConnectedCond.L.Unlock()
c.recvLock.Lock()
defer c.recvLock.Unlock()
return stream.Recv() // cannot recv in parallel
}

// sned is a convenient way of invoking send providing thread-safe guarantees with `CloseSend` operations.
func (c *placementClient) send(host *v1pb.Host) error {
c.streamConnectedCond.L.Lock()
stream := c.clientStream
c.streamConnectedCond.L.Unlock()
c.sendLock.Lock()
defer c.sendLock.Unlock()
return stream.Send(host) // cannot close send and send in parallel
}

// newPlacementClient creates a new placement client for the given dial opts.
func newPlacementClient(optionGetter func() ([]grpc.DialOption, error)) *placementClient {
return &placementClient{
getGrpcOpts: optionGetter,
streamConnAlive: false,
streamConnectedCond: sync.NewCond(&sync.Mutex{}),
recvLock: &sync.Mutex{},
sendLock: &sync.Mutex{},
}
}
119 changes: 119 additions & 0 deletions pkg/actors/internal/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
Copyright 2022 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package internal

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)

func TestConnectToServer(t *testing.T) {
t.Run("when grpc get opts return an error connectToServer should return an error", func(t *testing.T) {
client := newPlacementClient(func() ([]grpc.DialOption, error) {
return nil, errEstablishingTLSConn
})
assert.Equal(t, client.connectToServer(""), errEstablishingTLSConn)
})
t.Run("when grpc dial returns an error connectToServer should return an error", func(t *testing.T) {
client := newPlacementClient(func() ([]grpc.DialOption, error) {
return []grpc.DialOption{}, nil
})

assert.NotNil(t, client.connectToServer(""))
})
t.Run("when new placement stream returns an error connectToServer should return an error", func(t *testing.T) {
client := newPlacementClient(func() ([]grpc.DialOption, error) {
return []grpc.DialOption{}, nil
})
conn, cleanup := newTestServerWithOpts() // do not register the placement stream server
defer cleanup()
assert.NotNil(t, client.connectToServer(conn))
})
t.Run("when connectToServer succeeds it should broadcast that a new connection is alive", func(t *testing.T) {
conn, _, cleanup := newTestServer() // do not register the placement stream server
defer cleanup()

client := newPlacementClient(getGrpcOptsGetter([]string{conn}, nil))

var ready sync.WaitGroup
ready.Add(1)
go func() {
client.waitUntil(func(streamConnAlive bool) bool {
return streamConnAlive
})
ready.Done()
}()

assert.Nil(t, client.connectToServer(conn))
ready.Wait() // should not timeout
assert.True(t, client.streamConnAlive)
})
}

func TestDisconnect(t *testing.T) {
t.Run("disconnectFn should return and broadcast when connection is not alive", func(t *testing.T) {
client := newPlacementClient(func() ([]grpc.DialOption, error) {
return nil, nil
})
client.streamConnAlive = true

called := false
shouldNotBeCalled := func() {
called = true
}
var ready sync.WaitGroup
ready.Add(1)

go func() {
client.waitUntil(func(streamConnAlive bool) bool {
return !streamConnAlive
})
ready.Done()
}()
client.streamConnAlive = false
client.disconnectFn(shouldNotBeCalled)
ready.Wait()
assert.False(t, called)
})
t.Run("disconnectFn should broadcast not connected when disconnected and should drain and execute func inside lock", func(t *testing.T) {
conn, _, cleanup := newTestServer() // do not register the placement stream server
defer cleanup()

client := newPlacementClient(getGrpcOptsGetter([]string{conn}, nil))
assert.Nil(t, client.connectToServer(conn))

called := false
shouldBeCalled := func() {
called = true
}

var ready sync.WaitGroup
ready.Add(1)

go func() {
client.waitUntil(func(streamConnAlive bool) bool {
return !streamConnAlive
})
ready.Done()
}()
client.disconnectFn(shouldBeCalled)
ready.Wait()
assert.Equal(t, client.clientConn.GetState(), connectivity.Shutdown)
assert.True(t, called)
})
}
68 changes: 68 additions & 0 deletions pkg/actors/internal/grpc_opts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
Copyright 2022 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package internal

import (
"errors"
"strings"
"sync"

daprCredentials "github.com/dapr/dapr/pkg/credentials"
diag "github.com/dapr/dapr/pkg/diagnostics"
"github.com/dapr/dapr/pkg/runtime/security"

"google.golang.org/grpc"
)

var errEstablishingTLSConn = errors.New("failed to establish TLS credentials for actor placement service")

// getGrpcOptsGetter returns a function that provides the grpc options and once defined, a cached version will be returned.
func getGrpcOptsGetter(servers []string, clientCert *daprCredentials.CertChain) func() ([]grpc.DialOption, error) {
mu := sync.RWMutex{}
var cached []grpc.DialOption
return func() ([]grpc.DialOption, error) {
mu.RLock()
if cached != nil {
mu.RUnlock()
return cached, nil
}
mu.RUnlock()
mu.Lock()
defer mu.Unlock()

if cached != nil { // double check lock
return cached, nil
}

opts, err := daprCredentials.GetClientOptions(clientCert, security.TLSServerName)
if err != nil {
log.Errorf("%s: %s", errEstablishingTLSConn, err)
return nil, errEstablishingTLSConn
}

if diag.DefaultGRPCMonitoring.IsEnabled() {
opts = append(
opts,
grpc.WithUnaryInterceptor(diag.DefaultGRPCMonitoring.UnaryClientInterceptor()))
}

if len(servers) == 1 && strings.HasPrefix(servers[0], "dns:///") {
// In Kubernetes environment, dapr-placement headless service resolves multiple IP addresses.
// With round robin load balancer, Dapr can find the leader automatically.
opts = append(opts, grpc.WithDefaultServiceConfig(grpcServiceConfig))
}
cached = opts
return cached, nil
}
}
Loading

0 comments on commit da5706c

Please sign in to comment.