Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 60 additions & 12 deletions internal/dcp/bootstrap/dcp_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"os"
"os/exec"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/go-logr/logr"

Expand Down Expand Up @@ -137,18 +139,13 @@ func DcpRun(
log.V(1).Info("About to launch host services")

shutdownErrors, lifecycleMsgs := host.RunAsync(hostCtx)
var shutdownHostOnce sync.Once
var shutdownHostErr error
shutdownHost := func() error {
cancelHostCtx()
var allErrors error

shutdownErr := <-shutdownErrors
if shutdownErr != nil {
log.Error(shutdownErr, "One or more hosted services failed to shut down gracefully")
allErrors = errors.Join(allErrors, shutdownErr)
}

allErrors = errors.Join(allErrors, apiServer.Dispose())
return allErrors
shutdownHostOnce.Do(func() {
shutdownHostErr = shutdownHostServicesAndDisposeApiServer(notifySrc, shutdownErrors, cancelHostCtx, apiServer.Dispose, hosting.ShutdownTimeout, log)
})
return shutdownHostErr
}

var err error
Expand All @@ -167,7 +164,7 @@ func DcpRun(
// there is no point trying to clean up all resources on shutdown because no actual resources are involved,
// it is all test mocks. Another case to avoid full cleanup is when shutdown request explicitly disables it.
if serverOnly || !resourceCleanup.IsFull() {
return nil // No cleanup needed, just return
return shutdownHost()
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 71b32e6 by guarding the shutdownHost closure with sync.Once. Even if a future path calls it more than once, it will only read shutdownErrors and dispose API server resources once.

}

err = appmgmt.CleanupAllResources(log)
Expand Down Expand Up @@ -199,6 +196,57 @@ func DcpRun(
}
}

func shutdownHostServicesAndDisposeApiServer(
notificationSource notifications.NotificationSource,
shutdownErrors <-chan error,
cancelHostCtx context.CancelFunc,
disposeApiServer func() error,
controllerShutdownTimeout time.Duration,
log logr.Logger,
) error {
var allErrors error

if notificationSource != nil {
notifyErr := notificationSource.NotifySubscribers(&notifications.ShutdownRequestedNotification{})
if notifyErr != nil {
log.Error(notifyErr, "Failed to request controller shutdown")
allErrors = errors.Join(allErrors, notifyErr)
} else {
log.V(1).Info("Requested controller shutdown")
}

shutdownTimer := time.NewTimer(controllerShutdownTimeout)
defer shutdownTimer.Stop()

select {
case shutdownErr := <-shutdownErrors:
if shutdownErr != nil {
log.Error(shutdownErr, "One or more hosted services failed to shut down gracefully")
allErrors = errors.Join(allErrors, shutdownErr)
}

case <-shutdownTimer.C:
shutdownErr := fmt.Errorf("controller host did not shut down within %s", controllerShutdownTimeout)
log.Error(shutdownErr, "Controller host shutdown timed out")
allErrors = errors.Join(allErrors, shutdownErr)
cancelHostCtx()
if hostShutdownErr := <-shutdownErrors; hostShutdownErr != nil {
log.Error(hostShutdownErr, "One or more hosted services failed to shut down after cancellation")
allErrors = errors.Join(allErrors, hostShutdownErr)
}
}
} else {
cancelHostCtx()
if shutdownErr := <-shutdownErrors; shutdownErr != nil {
log.Error(shutdownErr, "One or more hosted services failed to shut down gracefully")
allErrors = errors.Join(allErrors, shutdownErr)
}
}

allErrors = errors.Join(allErrors, disposeApiServer())
return allErrors
}

func createNotificationSource(lifetimeCtx context.Context, log logr.Logger) (notifications.UnixSocketNotificationSource, error) {
const noNotifications = "Notifications will not be sent to controller process"

Expand Down
135 changes: 135 additions & 0 deletions internal/dcp/bootstrap/dcp_run_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for license information.
*--------------------------------------------------------------------------------------------*/

package bootstrap

import (
"errors"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/microsoft/dcp/internal/notifications"
"github.com/microsoft/dcp/pkg/testutil"
)

func TestShutdownHostServicesWaitsForGracefulControllerExit(t *testing.T) {
ctx, cancel := testutil.GetTestContext(t, 5*time.Second)
defer cancel()

shutdownErrors := make(chan error, 1)
releaseHost := make(chan struct{})
disposed := make(chan struct{})
var hostCanceled atomic.Bool

notificationSource := notifications.NotifySubscribersFunc(func(n notifications.Notification) error {
require.Equal(t, notifications.NotificationKindShutdownRequested, n.Kind())
return nil
})

go func() {
<-releaseHost
shutdownErrors <- nil
}()

result := make(chan error, 1)
go func() {
result <- shutdownHostServicesAndDisposeApiServer(
notificationSource,
shutdownErrors,
func() { hostCanceled.Store(true) },
func() error {
close(disposed)
return nil
},
time.Second,
testutil.NewLogForTesting(t.Name()),
)
}()

select {
case <-disposed:
t.Fatal("API server was disposed before the controller host exited")
case <-time.After(100 * time.Millisecond):
}

close(releaseHost)

select {
case shutdownErr := <-result:
require.NoError(t, shutdownErr)
case <-ctx.Done():
t.Fatal("Timed out waiting for shutdown to complete")
}

require.False(t, hostCanceled.Load(), "host context should not be canceled after graceful controller exit")
}

func TestShutdownHostServicesCancelsHostAfterControllerShutdownTimeout(t *testing.T) {
ctx, cancel := testutil.GetTestContext(t, 5*time.Second)
defer cancel()

shutdownErrors := make(chan error, 1)
disposed := make(chan struct{})
var hostCanceled atomic.Bool

notificationSource := notifications.NotifySubscribersFunc(func(n notifications.Notification) error {
require.Equal(t, notifications.NotificationKindShutdownRequested, n.Kind())
return nil
})

shutdownErr := shutdownHostServicesAndDisposeApiServer(
notificationSource,
shutdownErrors,
func() {
hostCanceled.Store(true)
shutdownErrors <- nil
},
func() error {
close(disposed)
return nil
},
10*time.Millisecond,
testutil.NewLogForTesting(t.Name()),
)

require.Error(t, shutdownErr)
require.Contains(t, shutdownErr.Error(), "controller host did not shut down")
require.True(t, hostCanceled.Load(), "host context should be canceled after controller shutdown timeout")

select {
case <-disposed:
case <-ctx.Done():
t.Fatal("Timed out waiting for API server disposal")
}
}

func TestShutdownHostServicesDisposesApiServerWhenHostShutdownFails(t *testing.T) {
expectedErr := errors.New("host shutdown failed")
shutdownErrors := make(chan error, 1)
shutdownErrors <- expectedErr
disposed := make(chan struct{})

shutdownErr := shutdownHostServicesAndDisposeApiServer(
nil,
shutdownErrors,
func() {},
func() error {
close(disposed)
return nil
},
time.Second,
testutil.NewLogForTesting(t.Name()),
)

require.ErrorIs(t, shutdownErr, expectedErr)
select {
case <-disposed:
default:
t.Fatal("API server was not disposed")
}
}
12 changes: 8 additions & 4 deletions internal/dcpctrl/commands/run_controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func runControllers(log logr.Logger) func(cmd *cobra.Command, _ []string) error
}
startInvalidPersistentExecutableRecordCleanup(ctrlCtx, stateStore, leaseOwner, log)

trySetupNotificationHandler(ctrlCtx, log)
trySetupNotificationHandler(ctrlCtx, ctrlCtxCancel, log)

mgr, err := getManager(ctrlCtx, log.V(1))
if err != nil {
Expand Down Expand Up @@ -338,7 +338,7 @@ func runControllers(log logr.Logger) func(cmd *cobra.Command, _ []string) error
}
}

func trySetupNotificationHandler(notifyCtx context.Context, log logr.Logger) {
func trySetupNotificationHandler(notifyCtx context.Context, cancelNotifyCtx context.CancelFunc, log logr.Logger) {
notifySocketPath := notifications.GetNotificationSocketPath()
if notifySocketPath == "" {
return
Expand All @@ -347,14 +347,14 @@ func trySetupNotificationHandler(notifyCtx context.Context, log logr.Logger) {
log.V(1).Info("Setting up notification receiver", "SocketPath", notifySocketPath)

_, nrErr := notifications.NewNotificationSubscription(notifyCtx, notifySocketPath, log.WithName("NotificationReceiver"), func(n notifications.Notification) {
handleNotification(notifyCtx, n, log)
handleNotification(notifyCtx, cancelNotifyCtx, n, log)
})
if nrErr != nil {
log.Error(nrErr, "Failed to create cleanup notification receiver")
}
}

func handleNotification(ctx context.Context, note notifications.Notification, log logr.Logger) {
func handleNotification(ctx context.Context, cancelCtx context.CancelFunc, note notifications.Notification, log logr.Logger) {
switch note.Kind() {

case notifications.NotificationKindCleanupStarted:
Expand All @@ -368,6 +368,10 @@ func handleNotification(ctx context.Context, note notifications.Notification, lo
}
}

case notifications.NotificationKindShutdownRequested:
log.Info("Received shutdown request notification, stopping controller manager...")
cancelCtx()

case notifications.NotificationKindPerftraceRequest:
perfTraceReq, ok := note.(*notifications.PerftraceRequestNotification)
if !ok {
Expand Down
37 changes: 37 additions & 0 deletions internal/dcpctrl/commands/run_controllers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for license information.
*--------------------------------------------------------------------------------------------*/

package commands

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/microsoft/dcp/internal/notifications"
"github.com/microsoft/dcp/pkg/testutil"
)

func TestShutdownRequestedNotificationCancelsControllerContext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

handleNotification(
ctx,
cancel,
&notifications.ShutdownRequestedNotification{},
testutil.NewLogForTesting(t.Name()),
)

select {
case <-ctx.Done():
case <-time.After(time.Second):
t.Fatal("Timed out waiting for controller context cancellation")
}

require.ErrorIs(t, ctx.Err(), context.Canceled)
}
21 changes: 19 additions & 2 deletions internal/notifications/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ import (
type NotificationKind string

const (
NotificationKindCleanupStarted NotificationKind = "cleanup-started"
NotificationKindPerftraceRequest NotificationKind = "perftrace-request"
NotificationKindCleanupStarted NotificationKind = "cleanup-started"
NotificationKindPerftraceRequest NotificationKind = "perftrace-request"
NotificationKindShutdownRequested NotificationKind = "shutdown-requested"

NotificationSocketPathFlagName = "notification-socket"
)
Expand Down Expand Up @@ -62,6 +63,14 @@ func (n *CleanupStartedNotification) Kind() NotificationKind {
return NotificationKindCleanupStarted
}

type ShutdownRequestedNotification struct{}

var _ Notification = (*ShutdownRequestedNotification)(nil)

func (n *ShutdownRequestedNotification) Kind() NotificationKind {
return NotificationKindShutdownRequested
}

type PerftraceRequestNotification struct {
Duration time.Duration
}
Expand All @@ -84,6 +93,11 @@ func asNotificationData(n Notification) (*proto.NotificationData, error) {
Ntype: grpcutil.EnumVal(proto.NotificationType_NTYPE_CLEANUP_STARTED),
}, nil

case NotificationKindShutdownRequested:
return &proto.NotificationData{
Ntype: grpcutil.EnumVal(proto.NotificationType_NTYPE_SHUTDOWN_REQUESTED),
}, nil

case NotificationKindPerftraceRequest:
req := n.(*PerftraceRequestNotification)
return &proto.NotificationData{
Expand Down Expand Up @@ -111,6 +125,9 @@ func asNotification(nd *proto.NotificationData) (Notification, error) {
case proto.NotificationType_NTYPE_CLEANUP_STARTED:
return &CleanupStartedNotification{}, nil

case proto.NotificationType_NTYPE_SHUTDOWN_REQUESTED:
return &ShutdownRequestedNotification{}, nil

case proto.NotificationType_NTYPE_PERFTRACE_REQUEST:
ptr := nd.GetPerftraceRequest()
if ptr == nil {
Expand Down
Loading
Loading