diff --git a/apis/placement/v1alpha1/common.go b/apis/placement/v1alpha1/common.go
index 88023cf63..0c69e50fd 100644
--- a/apis/placement/v1alpha1/common.go
+++ b/apis/placement/v1alpha1/common.go
@@ -12,9 +12,18 @@ const (
 	// ResourceOverrideKind is the kind of the ResourceOverride.
 	ResourceOverrideKind = "ResourceOverride"
 
-	// ResourceOverrideSnapshotKind is the kind of the ResourceOverrideSnapshotKind.
+	// ResourceOverrideSnapshotKind is the kind of the ResourceOverrideSnapshot.
 	ResourceOverrideSnapshotKind = "ResourceOverrideSnapshot"
 
+	// ClusterStagedUpdateRunKind is the kind of the ClusterStagedUpdateRun.
+	ClusterStagedUpdateRunKind = "ClusterStagedUpdateRun"
+
+	// ClusterStagedUpdateStrategyKind is the kind of the ClusterStagedUpdateStrategy.
+	ClusterStagedUpdateStrategyKind = "ClusterStagedUpdateStrategy"
+
+	// ClusterApprovalRequestKind is the kind of the ClusterApprovalRequest.
+	ClusterApprovalRequestKind = "ClusterApprovalRequest"
+
 	// ClusterStagedUpdateRunFinalizer is used by the ClusterStagedUpdateRun controller to make sure that the ClusterStagedUpdateRun
 	// object is not deleted until all its dependent resources are deleted.
 	ClusterStagedUpdateRunFinalizer = fleetPrefix + "stagedupdaterun-finalizer"
diff --git a/charts/hub-agent/templates/deployment.yaml b/charts/hub-agent/templates/deployment.yaml
index 1a7a98037..7d25f1775 100644
--- a/charts/hub-agent/templates/deployment.yaml
+++ b/charts/hub-agent/templates/deployment.yaml
@@ -31,6 +31,7 @@ spec:
             - --enable-v1alpha1-apis={{ .Values.enableV1Alpha1APIs }}
             - --enable-v1beta1-apis={{ .Values.enableV1Beta1APIs }}
             - --enable-cluster-inventory-apis={{ .Values.enableClusterInventoryAPI }}
+            - --enable-staged-update-run-apis={{ .Values.enableStagedUpdateRunAPIs }}
             - --max-concurrent-cluster-placement={{ .Values.MaxConcurrentClusterPlacement }}
             - --concurrent-resource-change-syncs={{ .Values.ConcurrentResourceChangeSyncs }}
             - --log_file_max_size={{ .Values.logFileMaxSize }}
diff --git a/charts/hub-agent/values.yaml b/charts/hub-agent/values.yaml
index 8efe14714..777870a3d 100644
--- a/charts/hub-agent/values.yaml
+++ b/charts/hub-agent/values.yaml
@@ -36,6 +36,7 @@ affinity: {}
 enableV1Alpha1APIs: false
 enableV1Beta1APIs: true
 enableClusterInventoryAPI: true
+enableStagedUpdateRunAPIs: true
 
 hubAPIQPS: 250
 hubAPIBurst: 1000
diff --git a/cmd/hubagent/options/options.go b/cmd/hubagent/options/options.go
index b5f728bfd..ec680a58e 100644
--- a/cmd/hubagent/options/options.go
+++ b/cmd/hubagent/options/options.go
@@ -83,6 +83,8 @@ type Options struct {
 	EnableClusterInventoryAPIs bool
 	// ForceDeleteWaitTime is the duration the hub agent waits before force deleting a member cluster.
 	ForceDeleteWaitTime metav1.Duration
+	// EnableStagedUpdateRunAPIs enables the agents to watch the clusterStagedUpdateRun CRs.
+	EnableStagedUpdateRunAPIs bool
 }
 
 // NewOptions builds an empty options.
@@ -99,6 +101,7 @@ func NewOptions() *Options {
 		MaxFleetSizeSupported:         100,
 		EnableV1Alpha1APIs:            false,
 		EnableClusterInventoryAPIs:    false,
+		EnableStagedUpdateRunAPIs:     false,
 	}
 }
 
@@ -140,6 +143,7 @@ func (o *Options) AddFlags(flags *flag.FlagSet) {
 	flags.BoolVar(&o.EnableV1Beta1APIs, "enable-v1beta1-apis", true, "If set, the agents will watch for the v1beta1 APIs.")
 	flags.BoolVar(&o.EnableClusterInventoryAPIs, "enable-cluster-inventory-apis", false, "If set, the agents will watch for the ClusterInventory APIs.")
 	flags.DurationVar(&o.ForceDeleteWaitTime.Duration, "force-delete-wait-time", 15*time.Minute, "The duration the hub agent waits before force deleting a member cluster.")
+	flags.BoolVar(&o.EnableStagedUpdateRunAPIs, "enable-staged-update-run-apis", false, "If set, the agents will watch for the ClusterStagedUpdateRun APIs.")
 
 	o.RateLimiterOpts.AddFlags(flags)
 }
diff --git a/cmd/hubagent/workload/setup.go b/cmd/hubagent/workload/setup.go
index e5c2d0527..4b6e7d2a9 100644
--- a/cmd/hubagent/workload/setup.go
+++ b/cmd/hubagent/workload/setup.go
@@ -35,6 +35,7 @@ import (
 	"go.goms.io/fleet/pkg/controllers/overrider"
 	"go.goms.io/fleet/pkg/controllers/resourcechange"
 	"go.goms.io/fleet/pkg/controllers/rollout"
+	"go.goms.io/fleet/pkg/controllers/updaterun"
 	"go.goms.io/fleet/pkg/controllers/workgenerator"
 	"go.goms.io/fleet/pkg/resourcewatcher"
 	"go.goms.io/fleet/pkg/scheduler"
@@ -85,13 +86,20 @@ var (
 		placementv1alpha1.GroupVersion.WithKind(placementv1alpha1.ResourceOverrideSnapshotKind),
 	}
 
+	clusterStagedUpdateRunGVKs = []schema.GroupVersionKind{
+		placementv1alpha1.GroupVersion.WithKind(placementv1alpha1.ClusterStagedUpdateRunKind),
+		placementv1alpha1.GroupVersion.WithKind(placementv1alpha1.ClusterStagedUpdateStrategyKind),
+		placementv1alpha1.GroupVersion.WithKind(placementv1alpha1.ClusterApprovalRequestKind),
+	}
+
 	clusterInventoryGVKs = []schema.GroupVersionKind{
 		clusterinventory.GroupVersion.WithKind("ClusterProfile"),
 	}
 )
 
 // SetupControllers set up the customized controllers we developed
-func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager, config *rest.Config, opts *options.Options) error {
+func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager, config *rest.Config, opts *options.Options) error { //nolint:gocyclo
+	// TODO: Try to reduce the complexity of this last measured at 33 (failing at > 30) and remove the // nolint:gocyclo
 	dynamicClient, err := dynamic.NewForConfig(config)
 	if err != nil {
 		klog.ErrorS(err, "unable to create the dynamic client")
@@ -195,7 +203,7 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
 			return err
 		}
 
-		// Set up  a new controller to do rollout resources according to CRP rollout strategy
+		// Set up a new controller to do rollout resources according to CRP rollout strategy
 		klog.Info("Setting up rollout controller")
 		if err := (&rollout.Reconciler{
 			Client:                  mgr.GetClient(),
@@ -215,6 +223,24 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
 			return err
 		}
 
+		// Set up a controller to do staged update run, rolling out resources to clusters in a stage by stage manner.
+		if opts.EnableStagedUpdateRunAPIs {
+			for _, gvk := range clusterStagedUpdateRunGVKs {
+				if err = utils.CheckCRDInstalled(discoverClient, gvk); err != nil {
+					klog.ErrorS(err, "unable to find the required CRD", "GVK", gvk)
+					return err
+				}
+			}
+			klog.Info("Setting up clusterStagedUpdateRun controller")
+			if err = (&updaterun.Reconciler{
+				Client:          mgr.GetClient(),
+				InformerManager: dynamicInformerManager,
+			}).SetupWithManager(mgr); err != nil {
+				klog.ErrorS(err, "unable to set up clusterStagedUpdateRun controller")
+				return err
+			}
+		}
+
 		// Set up the work generator
 		klog.Info("Setting up work generator")
 		if err := (&workgenerator.Reconciler{
@@ -327,7 +353,7 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
 		}
 	}
 
-	// Set up  a new controller to reconcile any resources in the cluster
+	// Set up a new controller to reconcile any resources in the cluster
 	klog.Info("Setting up resource change controller")
 	rcr := &resourcechange.Reconciler{
 		DynamicClient:               dynamicClient,
diff --git a/examples/stagedupdaterun/clusterStagedUpdateRun.yaml b/examples/stagedupdaterun/clusterStagedUpdateRun.yaml
index a080da530..3e96a1280 100644
--- a/examples/stagedupdaterun/clusterStagedUpdateRun.yaml
+++ b/examples/stagedupdaterun/clusterStagedUpdateRun.yaml
@@ -4,10 +4,10 @@ metadata:
   name: example-run
 spec:
   placementName: example-placement
-  resourceSnapshotIndex: "1"
+  resourceSnapshotIndex: example-placement-0-snapshot
   stagedRolloutStrategyName: example-strategy
 status:
-  policySnapshotIndexUsed: "1"
+  policySnapshotIndexUsed: example-placement-0
   policyObservedClusterCount: 3
   appliedStrategy:
     type: Immediate
diff --git a/pkg/controllers/updaterun/controller.go b/pkg/controllers/updaterun/controller.go
index 5f6c23d72..10a993bf4 100644
--- a/pkg/controllers/updaterun/controller.go
+++ b/pkg/controllers/updaterun/controller.go
@@ -133,7 +133,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
 	// Execute the updateRun.
 	klog.V(2).InfoS("Continue to execute the clusterStagedUpdateRun", "updatingStageIndex", updatingStageIndex, "clusterStagedUpdateRun", runObjRef)
 	finished, waitTime, execErr := r.execute(ctx, &updateRun, updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings)
-	if execErr != nil && errors.Is(execErr, errStagedUpdatedAborted) {
+	if errors.Is(execErr, errStagedUpdatedAborted) {
 		// errStagedUpdatedAborted cannot be retried.
 		return runtime.Result{}, r.recordUpdateRunFailed(ctx, &updateRun, execErr.Error())
 	}
diff --git a/pkg/controllers/updaterun/controller_integration_test.go b/pkg/controllers/updaterun/controller_integration_test.go
index 1f655d539..d22077d85 100644
--- a/pkg/controllers/updaterun/controller_integration_test.go
+++ b/pkg/controllers/updaterun/controller_integration_test.go
@@ -268,7 +268,7 @@ func generateTestClusterSchedulingPolicySnapshot(idx int) *placementv1beta1.Clus
 	}
 }
 
-func generateTestClusterResourceBinding(policySnapshotName, targetCluster string) *placementv1beta1.ClusterResourceBinding {
+func generateTestClusterResourceBinding(policySnapshotName, targetCluster string, state placementv1beta1.BindingState) *placementv1beta1.ClusterResourceBinding {
 	binding := &placementv1beta1.ClusterResourceBinding{
 		ObjectMeta: metav1.ObjectMeta{
 			Name: "binding-" + testResourceSnapshotName + "-" + targetCluster,
@@ -277,7 +277,7 @@ func generateTestClusterResourceBinding(policySnapshotName, targetCluster string
 			},
 		},
 		Spec: placementv1beta1.ResourceBindingSpec{
-			State:                        placementv1beta1.BindingStateScheduled,
+			State:                        state,
 			TargetCluster:                targetCluster,
 			SchedulingPolicySnapshotName: policySnapshotName,
 		},
diff --git a/pkg/controllers/updaterun/execution.go b/pkg/controllers/updaterun/execution.go
index 77c02fc9c..544bdf180 100644
--- a/pkg/controllers/updaterun/execution.go
+++ b/pkg/controllers/updaterun/execution.go
@@ -115,6 +115,9 @@ func (r *Reconciler) executeUpdatingStage(
 					return 0, controller.NewUpdateIgnoreConflictError(err)
 				}
 				klog.V(2).InfoS("Updated the status of a binding to bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
+				if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil {
+					return 0, err
+				}
 			} else {
 				klog.V(2).InfoS("Found the first binding that is updating but the cluster status has not been updated", "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
 				if binding.Spec.State != placementv1beta1.BindingStateBound {
@@ -124,6 +127,14 @@ func (r *Reconciler) executeUpdatingStage(
 						return 0, controller.NewUpdateIgnoreConflictError(err)
 					}
 					klog.V(2).InfoS("Updated the status of a binding to bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
+					if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil {
+						return 0, err
+					}
+				} else if !condition.IsConditionStatusTrue(meta.FindStatusCondition(binding.Status.Conditions, string(placementv1beta1.ResourceBindingRolloutStarted)), binding.Generation) {
+					klog.V(2).InfoS("The binding is bound and up-to-date but the generation is updated by the scheduler, update rolloutStarted status again", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
+					if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil {
+						return 0, err
+					}
 				} else {
 					if _, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun); updateErr != nil {
 						return clusterUpdatingWaitTime, updateErr
@@ -139,8 +150,10 @@ func (r *Reconciler) executeUpdatingStage(
 		}
 
 		// Now the cluster has to be updating, the binding should point to the right resource snapshot and the binding should be bound.
-		if !isBindingSyncedWithClusterStatus(updateRun, binding, clusterStatus) || binding.Spec.State != placementv1beta1.BindingStateBound {
-			unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("the updating cluster `%s` in the stage %s does not match the cluster status: %+v, binding: %+v", clusterStatus.ClusterName, updatingStageStatus.StageName, clusterStatus, binding.Spec))
+		if !isBindingSyncedWithClusterStatus(updateRun, binding, clusterStatus) || binding.Spec.State != placementv1beta1.BindingStateBound ||
+			!condition.IsConditionStatusTrue(meta.FindStatusCondition(binding.Status.Conditions, string(placementv1beta1.ResourceBindingRolloutStarted)), binding.Generation) {
+			unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("the updating cluster `%s` in the stage %s does not match the cluster status: %+v, binding: %+v, condition: %+v",
+				clusterStatus.ClusterName, updatingStageStatus.StageName, clusterStatus, binding.Spec, binding.GetCondition(string(placementv1beta1.ResourceBindingRolloutStarted))))
 			klog.ErrorS(unexpectedErr, "The binding has been changed during updating, please check if there's concurrent clusterStagedUpdateRun", "clusterStagedUpdateRun", updateRunRef)
 			markClusterUpdatingFailed(clusterStatus, updateRun.Generation, unexpectedErr.Error())
 			return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error())
@@ -314,6 +327,26 @@ func (r *Reconciler) checkAfterStageTasksStatus(ctx context.Context, updatingSta
 	return true, nil
 }
 
+// updateBindingRolloutStarted updates the binding status to indicate the rollout has started.
+func (r *Reconciler) updateBindingRolloutStarted(ctx context.Context, binding *placementv1beta1.ClusterResourceBinding, updateRun *placementv1alpha1.ClusterStagedUpdateRun) error {
+	// first reset the condition to reflect the latest lastTransitionTime
+	binding.RemoveCondition(string(placementv1beta1.ResourceBindingRolloutStarted))
+	cond := metav1.Condition{
+		Type:               string(placementv1beta1.ResourceBindingRolloutStarted),
+		Status:             metav1.ConditionTrue,
+		ObservedGeneration: binding.Generation,
+		Reason:             condition.RolloutStartedReason,
+		Message:            fmt.Sprintf("Detected the new changes on the resources and started the rollout process, resourceSnapshotName: %s, clusterStagedUpdateRun: %s", updateRun.Spec.ResourceSnapshotIndex, updateRun.Name),
+	}
+	binding.SetConditions(cond)
+	if err := r.Client.Status().Update(ctx, binding); err != nil {
+		klog.ErrorS(err, "Failed to update binding status", "clusterResourceBinding", klog.KObj(binding), "condition", cond)
+		return controller.NewUpdateIgnoreConflictError(err)
+	}
+	klog.V(2).InfoS("Updated binding as rolloutStarted", "clusterResourceBinding", klog.KObj(binding), "condition", cond)
+	return nil
+}
+
 // isBindingSyncedWithClusterStatus checks if the binding is up-to-date with the cluster status.
 func isBindingSyncedWithClusterStatus(updateRun *placementv1alpha1.ClusterStagedUpdateRun, binding *placementv1beta1.ClusterResourceBinding, cluster *placementv1alpha1.ClusterUpdatingStatus) bool {
 	if binding.Spec.ResourceSnapshotName != updateRun.Spec.ResourceSnapshotIndex {
@@ -335,8 +368,8 @@ func isBindingSyncedWithClusterStatus(updateRun *placementv1alpha1.ClusterStaged
 	return true
 }
 
-// checkClusterUpdateResult checks if the cluster has been updated successfully.
-// It returns if the cluster has been updated successfully and the error if the cluster update failed.
+// checkClusterUpdateResult checks if the resources have been updated successfully on a given cluster.
+// It returns true if the resources have been updated successfully or any error if the update failed.
 func checkClusterUpdateResult(
 	binding *placementv1beta1.ClusterResourceBinding,
 	clusterStatus *placementv1alpha1.ClusterUpdatingStatus,
diff --git a/pkg/controllers/updaterun/execution_integration_test.go b/pkg/controllers/updaterun/execution_integration_test.go
index 0ed3f4bea..2eef72130 100644
--- a/pkg/controllers/updaterun/execution_integration_test.go
+++ b/pkg/controllers/updaterun/execution_integration_test.go
@@ -64,14 +64,14 @@ var _ = Describe("UpdateRun execution tests", func() {
 			}
 			// reserse the order of the clusters by index
 			targetClusters[i] = generateTestMemberCluster(numTargetClusters-1-i, "cluster-"+strconv.Itoa(i), map[string]string{"group": "prod", "region": region})
-			resourceBindings[i] = generateTestClusterResourceBinding(policySnapshot.Name, targetClusters[i].Name)
+			resourceBindings[i] = generateTestClusterResourceBinding(policySnapshot.Name, targetClusters[i].Name, placementv1beta1.BindingStateScheduled)
 		}
 
 		unscheduledCluster = make([]*clusterv1beta1.MemberCluster, numUnscheduledClusters)
 		for i := range unscheduledCluster {
 			unscheduledCluster[i] = generateTestMemberCluster(i, "unscheduled-cluster-"+strconv.Itoa(i), map[string]string{"group": "staging"})
 			// update the policySnapshot name so that these clusters are considered to-be-deleted
-			resourceBindings[numTargetClusters+i] = generateTestClusterResourceBinding(policySnapshot.Name+"a", unscheduledCluster[i].Name)
+			resourceBindings[numTargetClusters+i] = generateTestClusterResourceBinding(policySnapshot.Name+"a", unscheduledCluster[i].Name, placementv1beta1.BindingStateUnscheduled)
 		}
 
 		var err error
@@ -215,7 +215,7 @@ var _ = Describe("UpdateRun execution tests", func() {
 			validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "")
 		})
 
-		It("Should mark the 3rd cluster in the 1st stage as succeeded", func() {
+		It("Should mark the 3rd cluster in the 1st stage as succeeded after marking the binding available", func() {
 			By("Validating the 3rd clusterResourceBinding is updated to Bound")
 			binding := resourceBindings[numTargetClusters-5] // cluster-5
 			validateBindingState(ctx, binding, resourceSnapshot.Name, updateRun, 0)
@@ -494,6 +494,11 @@ func validateBindingState(ctx context.Context, binding *placementv1beta1.Cluster
 		if diff := cmp.Diff(binding.Spec.ApplyStrategy, updateRun.Status.ApplyStrategy); diff != "" {
 			return fmt.Errorf("binding %s has different applyStrategy (-want +got):\n%s", binding.Name, diff)
 		}
+
+		rolloutStartedCond := binding.GetCondition(string(placementv1beta1.ResourceBindingRolloutStarted))
+		if !condition.IsConditionStatusTrue(rolloutStartedCond, binding.Generation) {
+			return fmt.Errorf("binding %s does not have RolloutStarted condition", binding.Name)
+		}
 		return nil
 	}, timeout, interval).Should(Succeed(), "failed to validate the binding state")
 }
diff --git a/pkg/controllers/updaterun/initialization.go b/pkg/controllers/updaterun/initialization.go
index ec0da021a..5db90a40f 100644
--- a/pkg/controllers/updaterun/initialization.go
+++ b/pkg/controllers/updaterun/initialization.go
@@ -183,6 +183,12 @@ func (r *Reconciler) collectScheduledClusters(
 			klog.V(2).InfoS("Found a scheduled binding", "binding", binding.Name, "clusterResourcePlacement", placementName, "latestPolicySnapshot", latestPolicySnapshot.Name, "clusterStagedUpdateRun", updateRunRef)
 			selectedBindings = append(selectedBindings, &bindingList.Items[i])
 		} else {
+			if binding.Spec.State != placementv1beta1.BindingStateUnscheduled {
+				stateErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("binding `%s` with old policy snapshot %s has state %s, not unscheduled", binding.Name, binding.Spec.SchedulingPolicySnapshotName, binding.Spec.State))
+				klog.ErrorS(stateErr, "Failed to collect clusterResourceBindings", "clusterResourcePlacement", placementName, "latestPolicySnapshot", latestPolicySnapshot.Name, "clusterStagedUpdateRun", updateRunRef)
+				// no more retries here.
+				return nil, nil, fmt.Errorf("%w: %s", errInitializedFailed, stateErr.Error())
+			}
 			klog.V(2).InfoS("Found a to-be-deleted binding", "binding", binding.Name, "clusterResourcePlacement", placementName, "latestPolicySnapshot", latestPolicySnapshot.Name, "clusterStagedUpdateRun", updateRunRef)
 			toBeDeletedBindings = append(toBeDeletedBindings, &bindingList.Items[i])
 		}
diff --git a/pkg/controllers/updaterun/initialization_integration_test.go b/pkg/controllers/updaterun/initialization_integration_test.go
index 0f86f5d1a..d849c4ec1 100644
--- a/pkg/controllers/updaterun/initialization_integration_test.go
+++ b/pkg/controllers/updaterun/initialization_integration_test.go
@@ -76,14 +76,14 @@ var _ = Describe("Updaterun initialization tests", func() {
 			}
 			// reserse the order of the clusters by index
 			targetClusters[i] = generateTestMemberCluster(numTargetClusters-1-i, "cluster-"+strconv.Itoa(i), map[string]string{"group": "prod", "region": region})
-			resourceBindings[i] = generateTestClusterResourceBinding(policySnapshot.Name, targetClusters[i].Name)
+			resourceBindings[i] = generateTestClusterResourceBinding(policySnapshot.Name, targetClusters[i].Name, placementv1beta1.BindingStateScheduled)
 		}
 
 		unscheduledCluster = make([]*clusterv1beta1.MemberCluster, numUnscheduledClusters)
 		for i := range unscheduledCluster {
 			unscheduledCluster[i] = generateTestMemberCluster(i, "unscheduled-cluster-"+strconv.Itoa(i), map[string]string{"group": "staging"})
 			// update the policySnapshot name so that these clusters are considered to-be-deleted
-			resourceBindings[numTargetClusters+i] = generateTestClusterResourceBinding(policySnapshot.Name+"a", unscheduledCluster[i].Name)
+			resourceBindings[numTargetClusters+i] = generateTestClusterResourceBinding(policySnapshot.Name+"a", unscheduledCluster[i].Name, placementv1beta1.BindingStateUnscheduled)
 		}
 
 		var err error
@@ -380,7 +380,7 @@ var _ = Describe("Updaterun initialization tests", func() {
 
 		It("Should not report error if there are only to-be-deleted clusters", func() {
 			By("Creating a to-be-deleted clusterResourceBinding")
-			binding := generateTestClusterResourceBinding(policySnapshot.Name+"a", "cluster-0")
+			binding := generateTestClusterResourceBinding(policySnapshot.Name+"a", "cluster-0", placementv1beta1.BindingStateUnscheduled)
 			Expect(k8sClient.Create(ctx, binding)).To(Succeed())
 
 			By("Creating a new clusterStagedUpdateRun")
@@ -391,10 +391,9 @@ var _ = Describe("Updaterun initialization tests", func() {
 			validateFailedInitCondition(ctx, updateRun, "referenced clusterStagedUpdateStrategy not found")
 		})
 
-		It("Should fail to initialize if the bindings are not in Scheduled or Bound state", func() {
+		It("Should fail to initialize if the bindings with latest policy snapshots are not in Scheduled or Bound state", func() {
 			By("Creating a not scheduled clusterResourceBinding")
-			binding := generateTestClusterResourceBinding(policySnapshot.Name, "cluster-1")
-			binding.Spec.State = placementv1beta1.BindingStateUnscheduled
+			binding := generateTestClusterResourceBinding(policySnapshot.Name, "cluster-1", placementv1beta1.BindingStateUnscheduled)
 			Expect(k8sClient.Create(ctx, binding)).To(Succeed())
 
 			By("Creating a new clusterStagedUpdateRun")
@@ -406,6 +405,21 @@ var _ = Describe("Updaterun initialization tests", func() {
 			By("Deleting the clusterResourceBinding")
 			Expect(k8sClient.Delete(ctx, binding)).Should(Succeed())
 		})
+
+		It("Should fail to initialize if the bindings with old policy snapshots are not in Unscheduled state", func() {
+			By("Creating a scheduled clusterResourceBinding with old policy snapshot")
+			binding := generateTestClusterResourceBinding(policySnapshot.Name+"a", "cluster-0", placementv1beta1.BindingStateScheduled)
+			Expect(k8sClient.Create(ctx, binding)).To(Succeed())
+
+			By("Creating a new clusterStagedUpdateRun")
+			Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())
+
+			By("Validating the initialization failed")
+			validateFailedInitCondition(ctx, updateRun, "has state Scheduled, not unscheduled")
+
+			By("Deleting the clusterResourceBinding")
+			Expect(k8sClient.Delete(ctx, binding)).Should(Succeed())
+		})
 	})
 
 	Context("Test generateStagesByStrategy", func() {
diff --git a/pkg/controllers/updaterun/validation_integration_test.go b/pkg/controllers/updaterun/validation_integration_test.go
index 2d2efd03e..b1f960c07 100644
--- a/pkg/controllers/updaterun/validation_integration_test.go
+++ b/pkg/controllers/updaterun/validation_integration_test.go
@@ -63,14 +63,14 @@ var _ = Describe("UpdateRun validation tests", func() {
 			}
 			// reserse the order of the clusters by index
 			targetClusters[i] = generateTestMemberCluster(numTargetClusters-1-i, "cluster-"+strconv.Itoa(i), map[string]string{"group": "prod", "region": region})
-			resourceBindings[i] = generateTestClusterResourceBinding(policySnapshot.Name, targetClusters[i].Name)
+			resourceBindings[i] = generateTestClusterResourceBinding(policySnapshot.Name, targetClusters[i].Name, placementv1beta1.BindingStateBound)
 		}
 
 		unscheduledCluster = make([]*clusterv1beta1.MemberCluster, numUnscheduledClusters)
 		for i := range unscheduledCluster {
 			unscheduledCluster[i] = generateTestMemberCluster(i, "unscheduled-cluster-"+strconv.Itoa(i), map[string]string{"group": "staging"})
 			// update the policySnapshot name so that these clusters are considered to-be-deleted
-			resourceBindings[numTargetClusters+i] = generateTestClusterResourceBinding(policySnapshot.Name+"a", unscheduledCluster[i].Name)
+			resourceBindings[numTargetClusters+i] = generateTestClusterResourceBinding(policySnapshot.Name+"a", unscheduledCluster[i].Name, placementv1beta1.BindingStateUnscheduled)
 		}
 
 		var err error