diff --git a/config/base/crds/full/numaflow.numaproj.io_interstepbufferservices.yaml b/config/base/crds/full/numaflow.numaproj.io_interstepbufferservices.yaml index 993551ebbc..5e1c014d71 100644 --- a/config/base/crds/full/numaflow.numaproj.io_interstepbufferservices.yaml +++ b/config/base/crds/full/numaflow.numaproj.io_interstepbufferservices.yaml @@ -3174,6 +3174,7 @@ spec: - Pending - Running - Failed + - Deleting type: string type: type: string diff --git a/config/install.yaml b/config/install.yaml index 11653adc71..8b354dba04 100644 --- a/config/install.yaml +++ b/config/install.yaml @@ -3173,6 +3173,7 @@ spec: - Pending - Running - Failed + - Deleting type: string type: type: string diff --git a/config/namespace-install.yaml b/config/namespace-install.yaml index 025269f458..3ee395c7ff 100644 --- a/config/namespace-install.yaml +++ b/config/namespace-install.yaml @@ -3173,6 +3173,7 @@ spec: - Pending - Running - Failed + - Deleting type: string type: type: string diff --git a/pkg/apis/numaflow/v1alpha1/isbsvc_types.go b/pkg/apis/numaflow/v1alpha1/isbsvc_types.go index 56cbde600d..5e5165b63d 100644 --- a/pkg/apis/numaflow/v1alpha1/isbsvc_types.go +++ b/pkg/apis/numaflow/v1alpha1/isbsvc_types.go @@ -20,14 +20,15 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -// +kubebuilder:validation:Enum="";Pending;Running;Failed +// +kubebuilder:validation:Enum="";Pending;Running;Failed;Deleting type ISBSvcPhase string const ( - ISBSvcPhaseUnknown ISBSvcPhase = "" - ISBSvcPhasePending ISBSvcPhase = "Pending" - ISBSvcPhaseRunning ISBSvcPhase = "Running" - ISBSvcPhaseFailed ISBSvcPhase = "Failed" + ISBSvcPhaseUnknown ISBSvcPhase = "" + ISBSvcPhasePending ISBSvcPhase = "Pending" + ISBSvcPhaseRunning ISBSvcPhase = "Running" + ISBSvcPhaseFailed ISBSvcPhase = "Failed" + ISBSvcPhaseDeleting ISBSvcPhase = "Deleting" // ISBSvcConditionConfigured has the status True when the InterStepBufferService // has valid configuration. @@ -150,7 +151,8 @@ func (iss *InterStepBufferServiceStatus) SetObservedGeneration(value int64) { // IsHealthy indicates whether the InterStepBufferService is healthy or not func (iss *InterStepBufferServiceStatus) IsHealthy() bool { - if iss.Phase != ISBSvcPhaseRunning { + // Deleting is a special case, we don't want to mark it as unhealthy as Pipeline reconciliation relies on it + if iss.Phase != ISBSvcPhaseRunning && iss.Phase != ISBSvcPhaseDeleting { return false } return iss.IsReady() diff --git a/pkg/reconciler/isbsvc/controller.go b/pkg/reconciler/isbsvc/controller.go index 1cddae5d4b..6f3cd212f9 100644 --- a/pkg/reconciler/isbsvc/controller.go +++ b/pkg/reconciler/isbsvc/controller.go @@ -101,6 +101,7 @@ func (r *interStepBufferServiceReconciler) reconcile(ctx context.Context, isbSvc // Finalizer logic should be added here. if err := installer.Uninstall(ctx, isbSvc, r.client, r.kubeClient, r.config, log, r.recorder); err != nil { log.Errorw("Failed to uninstall", zap.Error(err)) + isbSvc.Status.SetPhase(dfv1.ISBSvcPhaseDeleting, err.Error()) return err } controllerutil.RemoveFinalizer(isbSvc, finalizerName) diff --git a/pkg/reconciler/isbsvc/installer/installer.go b/pkg/reconciler/isbsvc/installer/installer.go index e1730416b6..0c295b9dc6 100644 --- a/pkg/reconciler/isbsvc/installer/installer.go +++ b/pkg/reconciler/isbsvc/installer/installer.go @@ -91,6 +91,13 @@ func getInstaller(isbSvc *dfv1.InterStepBufferService, client client.Client, kub // // It could also be used to check if the ISB Service object can be safely deleted. func Uninstall(ctx context.Context, isbSvc *dfv1.InterStepBufferService, client client.Client, kubeClient kubernetes.Interface, config *reconciler.GlobalConfig, logger *zap.SugaredLogger, recorder record.EventRecorder) error { + pls, err := referencedPipelines(ctx, client, isbSvc) + if err != nil { + return fmt.Errorf("failed to check if there is any pipeline using this InterStepBufferService, %w", err) + } + if pls > 0 { + return fmt.Errorf("can not delete InterStepBufferService %q which has %d pipelines connected", isbSvc.Name, pls) + } installer, err := getInstaller(isbSvc, client, kubeClient, config, logger, recorder) if err != nil { logger.Errorw("Failed to get an installer", zap.Error(err)) @@ -98,3 +105,23 @@ func Uninstall(ctx context.Context, isbSvc *dfv1.InterStepBufferService, client } return installer.Uninstall(ctx) } + +func referencedPipelines(ctx context.Context, c client.Client, isbSvc *dfv1.InterStepBufferService) (int, error) { + pipelines := &dfv1.PipelineList{} + if err := c.List(ctx, pipelines, &client.ListOptions{ + Namespace: isbSvc.Namespace, + }); err != nil { + return 0, err + } + result := 0 + for _, pl := range pipelines.Items { + isbSvcName := pl.Spec.InterStepBufferServiceName + if isbSvcName == "" { + isbSvcName = dfv1.DefaultISBSvcName + } + if isbSvcName == isbSvc.Name { + result++ + } + } + return result, nil +} diff --git a/pkg/reconciler/isbsvc/installer/installer_test.go b/pkg/reconciler/isbsvc/installer/installer_test.go index ce6e5dc124..7faeb6b9f3 100644 --- a/pkg/reconciler/isbsvc/installer/installer_test.go +++ b/pkg/reconciler/isbsvc/installer/installer_test.go @@ -250,4 +250,71 @@ func TestUnInstall(t *testing.T) { err := Uninstall(ctx, testObj, cl, kubeClient, fakeConfig, zaptest.NewLogger(t).Sugar(), record.NewFakeRecorder(64)) assert.NoError(t, err) }) + + t.Run("test has pl connected", func(t *testing.T) { + testPipeline := &dfv1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pl", + Namespace: testNamespace, + }, + Spec: dfv1.PipelineSpec{ + InterStepBufferServiceName: testISBSName, + }, + } + err := cl.Create(ctx, testPipeline) + assert.NoError(t, err) + testObj := testJetStreamIsbSvc.DeepCopy() + err = Uninstall(ctx, testObj, cl, kubeClient, fakeConfig, zaptest.NewLogger(t).Sugar(), record.NewFakeRecorder(64)) + assert.Error(t, err) + assert.Contains(t, err.Error(), "connected") + }) +} + +func Test_referencedPipelines(t *testing.T) { + cl := fake.NewClientBuilder().Build() + ctx := context.TODO() + + t.Run("test no referenced pls", func(t *testing.T) { + testObj := testJetStreamIsbSvc.DeepCopy() + pls, err := referencedPipelines(ctx, cl, testObj) + assert.NoError(t, err) + assert.Equal(t, 0, pls) + }) + + t.Run("test having referenced pls - non default isbsvc", func(t *testing.T) { + testPipeline := &dfv1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pl", + Namespace: testNamespace, + }, + Spec: dfv1.PipelineSpec{ + InterStepBufferServiceName: testISBSName, + }, + } + err := cl.Create(ctx, testPipeline) + assert.NoError(t, err) + testObj := testJetStreamIsbSvc.DeepCopy() + pls, err := referencedPipelines(ctx, cl, testObj) + assert.NoError(t, err) + assert.Equal(t, 1, pls) + }) + + t.Run("test having referenced pls - default isbsvc", func(t *testing.T) { + testPipeline := &dfv1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pl-1", + Namespace: testNamespace, + }, + Spec: dfv1.PipelineSpec{ + InterStepBufferServiceName: "", + }, + } + err := cl.Create(ctx, testPipeline) + assert.NoError(t, err) + testObj := testJetStreamIsbSvc.DeepCopy() + testObj.Name = "default" + pls, err := referencedPipelines(ctx, cl, testObj) + assert.NoError(t, err) + assert.Equal(t, 1, pls) + }) }