Skip to content

Commit

Permalink
Initiate Dapr shutdown after expiry of grace period - Issue 5481 (dap…
Browse files Browse the repository at this point in the history
…r#5562)

* Kick off shutdown after expiry of grace period

Signed-off-by: Akhila Chetlapalle <[email protected]>

* update shutdown with grace period. Add tests for pubsub actors and bindings

Signed-off-by: Akhila Chetlapalle <[email protected]>

* fix linting and windows incompatibility

Signed-off-by: Akhila Chetlapalle <[email protected]>

* fixed tests on windows. SIGTERM sending fails on windows. So invoke shutdown for windows tests

Signed-off-by: Akhila Chetlapalle <[email protected]>

* review comments incorporated

Signed-off-by: Akhila Chetlapalle <[email protected]>

* review comments incorporated

Signed-off-by: Akhila Chetlapalle <[email protected]>

* removed comment

Signed-off-by: Akhila Chetlapalle <[email protected]>

* update branch and add pubsub order check

Signed-off-by: Akhila Chetlapalle <[email protected]>

* Fixed trace initiation and shutdown. Updated trace Registration interface to return Tracer

Signed-off-by: Akhila Chetlapalle <[email protected]>

* reverting timeout pushed in test and moving trace shutdown to seperate method, moving trace shutdown to after api shutdown and removing go routine

Signed-off-by: Akhila Chetlapalle <[email protected]>

* re-trigger pipeline

Signed-off-by: Akhila Chetlapalle <[email protected]>

Signed-off-by: Akhila Chetlapalle <[email protected]>
Co-authored-by: Akhila Chetlapalle <[email protected]>
Co-authored-by: Yaron Schneider <[email protected]>
Co-authored-by: Alessandro (Ale) Segala <[email protected]>
Co-authored-by: Loong Dai <[email protected]>
Co-authored-by: Mukundan Sundararajan <[email protected]>
Co-authored-by: Artur Souza <[email protected]>
  • Loading branch information
7 people authored Dec 13, 2022
1 parent 64d468a commit 374a582
Show file tree
Hide file tree
Showing 3 changed files with 225 additions and 47 deletions.
45 changes: 28 additions & 17 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,7 @@ func (a *DaprRuntime) setupTracing(hostAddress string, tpStore tracerProviderSto

tpStore.RegisterSampler(daprTraceSampler)

tpStore.RegisterTracerProvider()

a.tracerProvider = tpStore.RegisterTracerProvider()
return nil
}

Expand Down Expand Up @@ -2656,32 +2655,26 @@ func (a *DaprRuntime) cleanSocket() {
func (a *DaprRuntime) Shutdown(duration time.Duration) {
// Ensure the Unix socket file is removed if a panic occurs.
defer a.cleanSocket()

log.Info("dapr shutting down.")

log.Info("Dapr shutting down")
log.Info("Stopping PubSub subscribers and input bindings")
a.stopSubscriptions()
a.stopReadingFromBindings()
a.cancel()

log.Info("Initiating actor shutdown")
a.stopActor()

log.Infof("Holding shutdown for %s to allow graceful stop of outstanding operations", duration.String())
<-time.After(duration)

log.Info("Stopping Dapr APIs")
for _, closer := range a.apiClosers {
if err := closer.Close(); err != nil {
log.Warnf("error closing API: %v", err)
}
}

shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
go func() {
if a.tracerProvider != nil {
a.tracerProvider.Shutdown(shutdownCtx)
}
}()

log.Infof("Waiting %s to finish outstanding operations", duration)
<-time.After(duration)
shutdownCancel()
a.stopTrace()
a.shutdownOutputComponents()
a.cancel()
a.shutdownC <- nil
}

Expand Down Expand Up @@ -3110,3 +3103,21 @@ func createGRPCManager(runtimeConfig *Config, globalConfig *config.Configuration
m.StartCollector()
return m
}

func (a *DaprRuntime) stopTrace() {
if a.tracerProvider == nil {
return
}
// Flush and shutdown the tracing provider.
shutdownCtx, shutdownCancel := context.WithCancel(a.ctx)
defer shutdownCancel()
if err := a.tracerProvider.ForceFlush(shutdownCtx); err != nil && !errors.Is(err, context.Canceled) {
log.Warnf("error flushing tracing provider: %v", err)
}

if err := a.tracerProvider.Shutdown(shutdownCtx); err != nil && !errors.Is(err, context.Canceled) {
log.Warnf("error shutting down tracing provider: %v", err)
} else {
a.tracerProvider = nil
}
}
219 changes: 192 additions & 27 deletions pkg/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,32 +744,9 @@ func TestInitState(t *testing.T) {
},
}

initMockStateStoreForRuntime := func(rt *DaprRuntime, e error) *daprt.MockStateStore {
mockStateStore := new(daprt.MockStateStore)

rt.stateStoreRegistry.RegisterComponent(
func(_ logger.Logger) state.Store {
return mockStateStore
},
"mockState",
)

expectedMetadata := state.Metadata{Base: mdata.Base{
Name: TestPubsubName,
Properties: map[string]string{
actorStateStore: "true",
"primaryEncryptionKey": primaryKey,
},
}}

mockStateStore.On("Init", expectedMetadata).Return(e)

return mockStateStore
}

t.Run("test init state store", func(t *testing.T) {
// setup
initMockStateStoreForRuntime(rt, nil)
initMockStateStoreForRuntime(rt, primaryKey, nil)

// act
err := rt.initState(mockStateComponent)
Expand All @@ -780,7 +757,7 @@ func TestInitState(t *testing.T) {

t.Run("test init state store error", func(t *testing.T) {
// setup
initMockStateStoreForRuntime(rt, assert.AnError)
initMockStateStoreForRuntime(rt, primaryKey, assert.AnError)

// act
err := rt.initState(mockStateComponent)
Expand All @@ -792,7 +769,7 @@ func TestInitState(t *testing.T) {

t.Run("test init state store, encryption not enabled", func(t *testing.T) {
// setup
initMockStateStoreForRuntime(rt, nil)
initMockStateStoreForRuntime(rt, primaryKey, nil)

// act
err := rt.initState(mockStateComponent)
Expand All @@ -805,7 +782,7 @@ func TestInitState(t *testing.T) {

t.Run("test init state store, encryption enabled", func(t *testing.T) {
// setup
initMockStateStoreForRuntime(rt, nil)
initMockStateStoreForRuntime(rt, primaryKey, nil)

rt.secretStores["mockSecretStore"] = &mockSecretStore{}

Expand Down Expand Up @@ -5400,3 +5377,191 @@ func TestNamespacedPublisher(t *testing.T) {

assert.Equal(t, "ns1topic0", rt.pubSubs[TestPubsubName].component.(*mockPublishPubSub).PublishedRequest.Load().Topic)
}

func TestGracefulShutdownPubSub(t *testing.T) {
rt := NewTestDaprRuntime(modes.StandaloneMode)
mockPubSub := new(daprt.MockPubSub)
rt.pubSubRegistry.RegisterComponent(
func(_ logger.Logger) pubsub.PubSub {
return mockPubSub
},
"mockPubSub",
)
mockPubSub.On("Init", mock.Anything).Return(nil)
mockPubSub.On("Subscribe", mock.AnythingOfType("pubsub.SubscribeRequest"), mock.AnythingOfType("pubsub.Handler")).Return(nil)
mockPubSub.On("Close").Return(nil)

cPubSub := componentsV1alpha1.Component{}
cPubSub.ObjectMeta.Name = "mockPubSub"
cPubSub.Spec.Type = "pubsub.mockPubSub"

req := invokev1.NewInvokeMethodRequest("dapr/subscribe")
req.WithHTTPExtension(http.MethodGet, "")
req.WithRawData(nil, invokev1.JSONContentType)
subscriptionItems := []runtimePubsub.SubscriptionJSON{
{PubsubName: "mockPubSub", Topic: "topic0", Route: "shutdown"},
}
sub, _ := json.Marshal(subscriptionItems)
fakeResp := invokev1.NewInvokeMethodResponse(200, "OK", nil)
fakeResp.WithRawData(sub, "application/json")

mockAppChannel := new(channelt.MockAppChannel)
rt.appChannel = mockAppChannel
mockAppChannel.On("InvokeMethod", mock.MatchedBy(matchContextInterface), req).Return(fakeResp, nil)

require.NoError(t, rt.initPubSub(cPubSub))
mockPubSub.AssertCalled(t, "Init", mock.Anything)
rt.startSubscriptions()
mockPubSub.AssertCalled(t, "Subscribe", mock.AnythingOfType("pubsub.SubscribeRequest"), mock.AnythingOfType("pubsub.Handler"))
assert.NotNil(t, rt.pubsubCtx)
assert.NotNil(t, rt.topicCtxCancels)
assert.NotNil(t, rt.topicRoutes)
go sendSigterm(rt)
select {
case <-rt.pubsubCtx.Done():
assert.Nil(t, rt.pubsubCtx)
assert.Nil(t, rt.topicCtxCancels)
assert.Nil(t, rt.topicRoutes)
case <-time.After(rt.runtimeConfig.GracefulShutdownDuration + 2*time.Second):
assert.Fail(t, "pubsub shutdown timed out")
}
}

func TestGracefulShutdownBindings(t *testing.T) {
rt := NewTestDaprRuntime(modes.StandaloneMode)

rt.bindingsRegistry.RegisterInputBinding(
func(_ logger.Logger) bindings.InputBinding {
return &daprt.MockBinding{}
},
"testInputBinding",
)
cin := componentsV1alpha1.Component{}
cin.ObjectMeta.Name = "testInputBinding"
cin.Spec.Type = "bindings.testInputBinding"

rt.bindingsRegistry.RegisterOutputBinding(
func(_ logger.Logger) bindings.OutputBinding {
return &daprt.MockBinding{}
},
"testOutputBinding",
)
cout := componentsV1alpha1.Component{}
cout.ObjectMeta.Name = "testOutputBinding"
cout.Spec.Type = "bindings.testOutputBinding"
require.NoError(t, rt.initInputBinding(cin))
require.NoError(t, rt.initOutputBinding(cout))

assert.Equal(t, len(rt.inputBindings), 1)
assert.Equal(t, len(rt.outputBindings), 1)

go sendSigterm(rt)
<-time.After(rt.runtimeConfig.GracefulShutdownDuration)
assert.Nil(t, rt.inputBindingsCancel)
assert.Nil(t, rt.inputBindingsCtx)
}

func TestGracefulShutdownActors(t *testing.T) {
rt := NewTestDaprRuntime(modes.StandaloneMode)

bytes := make([]byte, 32)
rand.Read(bytes)
encryptKey := hex.EncodeToString(bytes)

mockStateComponent := componentsV1alpha1.Component{
ObjectMeta: metaV1.ObjectMeta{
Name: TestPubsubName,
},
Spec: componentsV1alpha1.ComponentSpec{
Type: "state.mockState",
Version: "v1",
Metadata: []componentsV1alpha1.MetadataItem{
{
Name: "actorStateStore",
Value: componentsV1alpha1.DynamicValue{
JSON: v1.JSON{Raw: []byte("true")},
},
},
{
Name: "primaryEncryptionKey",
Value: componentsV1alpha1.DynamicValue{
JSON: v1.JSON{Raw: []byte(encryptKey)},
},
},
},
},
Auth: componentsV1alpha1.Auth{
SecretStore: "mockSecretStore",
},
}

// setup
initMockStateStoreForRuntime(rt, encryptKey, nil)

// act
err := rt.initState(mockStateComponent)

// assert
assert.NoError(t, err, "expected no error")

rt.namespace = "test"
rt.runtimeConfig.mtlsEnabled = true
assert.Nil(t, rt.initActors())

go sendSigterm(rt)
<-time.After(rt.runtimeConfig.GracefulShutdownDuration + 3*time.Second)

var activeActCount int
activeActors := rt.actor.GetActiveActorsCount(rt.ctx)
for _, v := range activeActors {
activeActCount += v.Count
}
assert.Equal(t, activeActCount, 0)
}

func initMockStateStoreForRuntime(rt *DaprRuntime, encryptKey string, e error) *daprt.MockStateStore {
mockStateStore := new(daprt.MockStateStore)

rt.stateStoreRegistry.RegisterComponent(
func(_ logger.Logger) state.Store {
return mockStateStore
},
"mockState",
)

expectedMetadata := state.Metadata{Base: mdata.Base{
Name: TestPubsubName,
Properties: map[string]string{
actorStateStore: "true",
"primaryEncryptionKey": encryptKey,
},
}}

mockStateStore.On("Init", expectedMetadata).Return(e)

return mockStateStore
}

func TestTraceShutdown(t *testing.T) {
rt := NewTestDaprRuntime(modes.StandaloneMode)
rt.globalConfig.Spec.TracingSpec = config.TracingSpec{
Otel: config.OtelSpec{
EndpointAddress: "foo.bar",
IsSecure: false,
Protocol: "http",
},
}
rt.hostAddress = "localhost:3000"
tpStore := newOpentelemetryTracerProviderStore()
require.NoError(t, rt.setupTracing(rt.hostAddress, tpStore))
assert.NotNil(t, rt.tracerProvider)

go sendSigterm(rt)
<-rt.ctx.Done()
assert.Nil(t, rt.tracerProvider)
}

func sendSigterm(rt *DaprRuntime) {
rt.runtimeConfig.GracefulShutdownDuration = 5 * time.Second
rt.Shutdown(rt.runtimeConfig.GracefulShutdownDuration)
}
8 changes: 5 additions & 3 deletions pkg/runtime/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type tracerProviderStore interface {
RegisterExporter(exporter sdktrace.SpanExporter)
RegisterResource(res *resource.Resource)
RegisterSampler(sampler sdktrace.Sampler)
RegisterTracerProvider()
RegisterTracerProvider() *sdktrace.TracerProvider
}

// newOpentelemetryTracerProviderStore returns an opentelemetryOptionsStore
Expand Down Expand Up @@ -61,7 +61,7 @@ func (s *opentelemetryTracerProviderStore) RegisterSampler(sampler sdktrace.Samp
}

// RegisterTraceProvider registers a trace provider as per the tracer options in the store
func (s *opentelemetryTracerProviderStore) RegisterTracerProvider() {
func (s *opentelemetryTracerProviderStore) RegisterTracerProvider() *sdktrace.TracerProvider {
if len(s.exporters) != 0 {
tracerOptions := []sdktrace.TracerProviderOption{}
for _, exporter := range s.exporters {
Expand All @@ -79,7 +79,9 @@ func (s *opentelemetryTracerProviderStore) RegisterTracerProvider() {
tp := sdktrace.NewTracerProvider(tracerOptions...)

otel.SetTracerProvider(tp)
return tp
}
return nil
}

// fakeTracerOptionsStore implements tracerOptionsStore by merely record the exporters
Expand Down Expand Up @@ -114,4 +116,4 @@ func (s *fakeTracerProviderStore) RegisterSampler(sampler sdktrace.Sampler) {
}

// RegisterTraceProvider does nothing
func (s *fakeTracerProviderStore) RegisterTracerProvider() {}
func (s *fakeTracerProviderStore) RegisterTracerProvider() *sdktrace.TracerProvider { return nil }

0 comments on commit 374a582

Please sign in to comment.