diff --git a/apis/placement/v1beta1/binding_types.go b/apis/placement/v1beta1/binding_types.go index f54d0b89e..6ac51c392 100644 --- a/apis/placement/v1beta1/binding_types.go +++ b/apis/placement/v1beta1/binding_types.go @@ -48,6 +48,11 @@ type ResourceBindingSpec struct { // it points to the name of the leading snapshot of the index group. ResourceSnapshotName string `json:"resourceSnapshotName"` + // PolicySnapshtName is the name of the scheduling policy snapshot that this resource binding + // points to; more specifically, the scheduler creates this bindings in accordance with this + // scheduling policy snapshot. + PolicySnapshotName string `json:"policySnapshotName"` + // TargetCluster is the name of the cluster that the scheduler assigns the resources to. TargetCluster string `json:"targetCluster"` diff --git a/apis/placement/v1beta1/zz_generated.deepcopy.go b/apis/placement/v1beta1/zz_generated.deepcopy.go index 197d7c2d0..ad3612e8e 100644 --- a/apis/placement/v1beta1/zz_generated.deepcopy.go +++ b/apis/placement/v1beta1/zz_generated.deepcopy.go @@ -12,7 +12,7 @@ package v1beta1 import ( corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" ) diff --git a/apis/v1alpha1/zz_generated.deepcopy.go b/apis/v1alpha1/zz_generated.deepcopy.go index 779e0c87b..d9f986657 100644 --- a/apis/v1alpha1/zz_generated.deepcopy.go +++ b/apis/v1alpha1/zz_generated.deepcopy.go @@ -12,7 +12,7 @@ package v1alpha1 import ( corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) diff --git a/config/crd/bases/placement.karavel.io_clusterresourcebindings.yaml b/config/crd/bases/placement.karavel.io_clusterresourcebindings.yaml index 1a0265d49..aa2160346 100644 --- a/config/crd/bases/placement.karavel.io_clusterresourcebindings.yaml +++ b/config/crd/bases/placement.karavel.io_clusterresourcebindings.yaml @@ -89,6 +89,12 @@ spec: - reason - selected type: object + policySnapshotName: + description: PolicySnapshtName is the name of the scheduling policy + snapshot that this resource binding points to; more specifically, + the scheduler creates this bindings in accordance with this scheduling + policy snapshot. + type: string resourceSnapshotName: description: ResourceSnapshotName is the name of the resource snapshot that this resource binding points to. If the resources are divided @@ -105,6 +111,7 @@ spec: type: string required: - clusterDecision + - policySnapshotName - resourceSnapshotName - state - targetCluster diff --git a/pkg/scheduler/framework/framework.go b/pkg/scheduler/framework/framework.go index 59ab205b6..94d1d541d 100644 --- a/pkg/scheduler/framework/framework.go +++ b/pkg/scheduler/framework/framework.go @@ -43,10 +43,8 @@ const ( maxClusterDecisionCount = 20 // The reasons and messages for scheduled conditions. - fullyScheduledReason = "SchedulingCompleted" - fullyScheduledMessage = "all required number of bindings have been created" - notFullyScheduledReason = "Pendingscheduling" - notFullyScheduledMessage = "might not have enough bindings created" + fullyScheduledReason = "SchedulingCompleted" + fullyScheduledMessage = "all required number of bindings have been created" // pickedByHighestScoreReason is the reason to use for scheduling decision when a cluster is picked as // it has a highest score. @@ -187,12 +185,10 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p klog.V(2).InfoS("Scheduling cycle ends", "schedulingPolicySnapshot", schedulingPolicySnapshotRef, "latency", latency) }() - errorMessage := "failed to run scheduling cycle" - // Retrieve the desired number of clusters from the policy. numOfClusters, err := utils.ExtractNumOfClustersFromPolicySnapshot(policy) if err != nil { - klog.ErrorS(err, errorMessage, schedulingPolicySnapshotRef) + klog.ErrorS(err, "failed to extract number of clusters required from policy snapshot", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) return ctrl.Result{}, err } @@ -203,7 +199,7 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p // changes eventually. clusters, err := f.collectClusters(ctx) if err != nil { - klog.ErrorS(err, errorMessage, schedulingPolicySnapshotRef) + klog.ErrorS(err, "failed to collect clusters", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) return ctrl.Result{}, err } @@ -222,7 +218,7 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p // TO-DO (chenyu1): explore the possbilities of using a mutation cache for better performance. bindings, err := f.collectBindings(ctx, crpName) if err != nil { - klog.ErrorS(err, errorMessage, schedulingPolicySnapshotRef) + klog.ErrorS(err, "failed to collect bindings", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) return ctrl.Result{}, err } @@ -232,45 +228,67 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p // have been cleared for processing by the dispatcher; and // * creating bindings, i.e., bindings that have been associated with a normally operating cluster, // but have not yet been cleared for processing by the dispatcher; and - // * obsolete bindings, i.e., bindings that are no longer associated with a normally operating - // cluster, but have not yet been marked as deleting by the scheduler; and + // * obsolete bindings, i.e., bindings that are scheduled in accordance with an out-of-date + // (i.e., no longer active) scheduling policy snapshot; it may or may have been cleared for + // processing by the dispatcher; and + // * dangling bindings, i.e., bindings that are associated with a cluster that is no longer + // in a normally operating state (the cluster has left the fleet, or is in the state of leaving), + // yet has not been marked as deleting by the scheduler; and // * deleted bindings, i.e, bindings that are marked for deletion in the API server, but have not // yet been marked as deleting by the scheduler. // - // Note that deleting bindings without the scheduler finalizer are ignored by the scheduler, as they + // Note that bindings marked as deleting are ignored by the scheduler (unless they are already marked + // for deletion and still has the scheduler cleanup finalizer), as they // are irrelevant to the scheduling cycle. - active, creating, obsolete, deleted := classifyBindings(bindings, clusters) + active, creating, obsolete, dangling, deleted := classifyBindings(policy, bindings, clusters) // If a binding has been marked for deletion yet still has the scheduler cleanup finalizer, // remove the finalizer from it to clear it for GC. if err := f.removeSchedulerCleanupFinalizerFrom(ctx, deleted); err != nil { - klog.ErrorS(err, errorMessage, schedulingPolicySnapshotRef) + klog.ErrorS(err, "failed to remove scheduler cleanup finalizer from deleted bindings", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) return ctrl.Result{}, err } - // Mark all obsolete bindings as deleting. - if err := f.markAsDeletingFor(ctx, obsolete); err != nil { - klog.ErrorS(err, errorMessage, schedulingPolicySnapshotRef) + // Mark all dangling bindings as deleting. + if err := f.markAsDeletingFor(ctx, dangling); err != nil { + klog.ErrorS(err, "failed to mark dangling bindings as deleting", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) return ctrl.Result{}, err } - // Check if the scheduler should downscale, i.e., mark some creating/active bindings as deleting. + // Check if the scheduler should downscale, i.e., mark some creating/active bindings as deleting and/or + // clean up all obsolete bindings right away. + // + // Normally obsolete bindings are kept for cross-referencing at the end of the scheduling cycle to minimize + // interruptions caused by scheduling policy change; however, in the case of downscaling, they can be removed + // right away. // - // Note that the scheduler will only downscale when + // To summarize, the scheduler will only downscale when // // * the scheduling policy is of the PickN type; and // * currently there are too many selected clusters, or more specifically too many creating and active bindings + // in the system; or there are exactly the right number of selected clusters, but some obsolete bindings still linger // in the system. - act, downscaleCount := shouldDownscale(policy, numOfClusters, len(active)+len(creating)) + act, downscaleCount := shouldDownscale(policy, numOfClusters, len(active)+len(creating), len(obsolete)) // Downscale if needed. // - // To minimize interruptions, the scheduler picks creating bindings first (in any order); if there are still more - // bindings to trim, the scheduler prefers ones with smaller CreationTimestamps. + // To minimize interruptions, the scheduler picks creating bindings first, and then + // active bindings; when processing active bindings, the logic prioritizes older bindings, i.e., bindings with + // smaller CreationTimestamp (assuming monotonic clock in the system). + // + // This step will also mark all obsolete bindings (if any) as deleting right away. if act { + klog.V(2).InfoS("Downscaling is needed", "schedulingPolicySnapshot", schedulingPolicySnapshotRef, "downscaleCount", downscaleCount) + // Mark all obsolete bindings as deleting first. + if err := f.markAsDeletingFor(ctx, obsolete); err != nil { + klog.ErrorS(err, "failed to mark obsolete bindings as deleting", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) + return ctrl.Result{}, err + } + + // Perform actual downscaling; this will be skipped if the downscale count is zero. active, creating, err = f.downscale(ctx, active, creating, downscaleCount) if err != nil { - klog.ErrorS(err, errorMessage, schedulingPolicySnapshotRef) + klog.ErrorS(err, "failed to downscale", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) return ctrl.Result{}, err } @@ -281,13 +299,12 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p // In the case of downscaling, the scheduler considers the policy to be fully scheduled. newSchedulingCondition := fullyScheduledCondition(policy) - // Update the policy snapshot status; since a downscaling has occurred, this update is always requied, hence - // no sameness (no change) checks are necessary. + // Update the policy snapshot status. // // Note that the op would fail if the policy snapshot is not the latest, so that consistency is // preserved. if err := f.updatePolicySnapshotStatus(ctx, policy, refreshedSchedulingDecisions, newSchedulingCondition); err != nil { - klog.ErrorS(err, errorMessage, schedulingPolicySnapshotRef) + klog.ErrorS(err, "failed to update policy snapshot status during downscaling", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) return ctrl.Result{}, err } @@ -295,20 +312,15 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p return ctrl.Result{}, nil } - // If no downscaling is needed, update the policy snapshot status any way. - // - // This is needed as a number of situations (e.g., POST/PUT failures) may lead to inconsistencies between - // the decisions added to the policy snapshot status and the actual list of bindings. - - // Collect current decisions and conditions for sameness (no change) checks. - currentSchedulingDecisions := policy.Status.ClusterDecisions - currentSchedulingCondition := meta.FindStatusCondition(policy.Status.Conditions, string(fleetv1beta1.PolicySnapshotScheduled)) - // Check if the scheduler needs to take action; a scheduling cycle is only needed if // * the policy is of the PickAll type; or // * the policy is of the PickN type, and currently there are not enough number of bindings. if !shouldSchedule(policy, numOfClusters, len(active)+len(creating)) { // No action is needed; however, a status refresh might be warranted. + // + // This is needed as a number of situations (e.g., POST/PUT failures) may lead to inconsistencies between + // the decisions added to the policy snapshot status and the actual list of bindings. + klog.V(2).InfoS("No scheduling is needed", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) // Refresh scheduling decisions. refreshedSchedulingDecisions := refreshSchedulingDecisionsFrom(policy, active, creating) @@ -317,45 +329,21 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p // In this case, since no action is needed, the scheduler considers the policy to be fully scheduled. newSchedulingCondition := fullyScheduledCondition(policy) - // Check if a refresh is warranted; the scheduler only update the status when there is a change in - // scheduling decisions and/or the scheduling condition. - if !equalDecisions(currentSchedulingDecisions, refreshedSchedulingDecisions) || !condition.EqualCondition(currentSchedulingCondition, &newSchedulingCondition) { - // Update the policy snapshot status. - // - // Note that the op would fail if the policy snapshot is not the latest, so that consistency is - // preserved. - if err := f.updatePolicySnapshotStatus(ctx, policy, refreshedSchedulingDecisions, newSchedulingCondition); err != nil { - klog.ErrorS(err, errorMessage, schedulingPolicySnapshotRef) - return ctrl.Result{}, err - } - } - - // Return immediate as there no more bindings for the scheduler to schedule at this moment. - return ctrl.Result{}, nil - } - - // The scheduler needs to take action; refresh the status first. - - // Refresh scheduling decisions. - refreshedSchedulingDecisions := refreshSchedulingDecisionsFrom(policy, active, creating) - // Prepare new scheduling condition. - // - // In this case, since action is needed, the scheduler marks the policy as not fully scheduled. - newSchedulingCondition := notFullyScheduledCondition(policy, numOfClusters) - // Check if a refresh is warranted; the scheduler only update the status when there is a change in - // scheduling decisions and/or the scheduling condition. - if !equalDecisions(currentSchedulingDecisions, refreshedSchedulingDecisions) || !condition.EqualCondition(currentSchedulingCondition, &newSchedulingCondition) { // Update the policy snapshot status. // // Note that the op would fail if the policy snapshot is not the latest, so that consistency is // preserved. if err := f.updatePolicySnapshotStatus(ctx, policy, refreshedSchedulingDecisions, newSchedulingCondition); err != nil { - klog.ErrorS(err, errorMessage, schedulingPolicySnapshotRef) + klog.ErrorS(err, "failed to update policy snapshot status (no scheduling needed)", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) return ctrl.Result{}, err } + + // Return immediate as there no more bindings for the scheduler to schedule at this moment. + return ctrl.Result{}, nil } - // Enter the actual scheduling stages. + // The scheduler needs to take action; enter the actual scheduling stages. + klog.V(2).InfoS("Scheduling is needed; entering scheduling stages", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) // Prepare the cycle state for this run. // @@ -365,13 +353,15 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p state := NewCycleState() // Calculate the batch size. + // + // Note that obsolete bindings are not counted. state.desiredBatchSize = int(*policy.Spec.Policy.NumberOfClusters) - len(active) - len(creating) // An earlier check guarantees that the desired batch size is always positive; however, the scheduler still // performs a sanity check here; normally this branch will never run. if state.desiredBatchSize <= 0 { err = fmt.Errorf("desired batch size is below zero: %d", state.desiredBatchSize) - klog.ErrorS(err, errorMessage, schedulingPolicySnapshotRef) + klog.ErrorS(err, "failed to calculate desired batch size", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err) } @@ -383,7 +373,7 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p // Note that any failure would lead to the cancellation of the scheduling cycle. batchSizeLimit, status := f.runPostBatchPlugins(ctx, state, policy) if status.IsInteralError() { - klog.ErrorS(status.AsError(), errorMessage, schedulingPolicySnapshotRef) + klog.ErrorS(status.AsError(), "failed to run post batch plugins", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) return ctrl.Result{}, controller.NewUnexpectedBehaviorError(status.AsError()) } @@ -391,7 +381,7 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p // the batch size limit is never greater than the desired batch size. if batchSizeLimit > state.desiredBatchSize { err := fmt.Errorf("batch size limit is greater than desired batch size: %d > %d", batchSizeLimit, state.desiredBatchSize) - klog.ErrorS(err, errorMessage, schedulingPolicySnapshotRef) + klog.ErrorS(err, "failed to set batch size limit", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err) } state.batchSizeLimit = batchSizeLimit @@ -405,7 +395,7 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p // // Note that any failure would lead to the cancellation of the scheduling cycle. if status := f.runPreFilterPlugins(ctx, state, policy); status.IsInteralError() { - klog.ErrorS(status.AsError(), errorMessage, schedulingPolicySnapshotRef) + klog.ErrorS(status.AsError(), "failed to run pre filter plugins", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) return ctrl.Result{}, controller.NewUnexpectedBehaviorError(status.AsError()) } @@ -416,18 +406,15 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p // are inspected in parallel. // // Note that any failure would lead to the cancellation of the scheduling cycle. - // - // TO-DO (chenyu1): assign variables when the implementation is ready. - passed, _, err := f.runFilterPlugins(ctx, state, policy, clusters) + passed, filtered, err := f.runFilterPlugins(ctx, state, policy, clusters) if err != nil { - klog.ErrorS(err, errorMessage, schedulingPolicySnapshotRef) + klog.ErrorS(err, "failed to run filter plugins", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err) } // Run pre-score plugins. - klog.V(4).InfoS("Running pre-score plugins", "policy", schedulingPolicySnapshotRef) if status := f.runPreScorePlugins(ctx, state, policy); status.IsInteralError() { - klog.ErrorS(status.AsError(), errorMessage, schedulingPolicySnapshotRef) + klog.ErrorS(status.AsError(), "failed ro run pre-score plugins", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) return ctrl.Result{}, controller.NewUnexpectedBehaviorError(status.AsError()) } @@ -438,10 +425,9 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p // // Note that at this moment, since no normalization is needed, the addition is performed directly at this step; // when need for normalization materializes, this step should return a list of scores per cluster per plugin instead. - klog.V(4).InfoS("Running score plugins", "policy", schedulingPolicySnapshotRef) scoredClusters, err := f.runScorePlugins(ctx, state, policy, passed) if err != nil { - klog.ErrorS(err, errorMessage, schedulingPolicySnapshotRef) + klog.ErrorS(err, "failed to run score plugins", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err) } @@ -456,7 +442,7 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p // scored clusters. if numOfClustersToPick > len(scoredClusters) { err := fmt.Errorf("number of clusters to pick is greater than number of scored clusters: %d > %d", numOfClustersToPick, len(scoredClusters)) - klog.ErrorS(err, errorMessage, schedulingPolicySnapshotRef) + klog.ErrorS(err, "failed to calculate number of clusters to pick", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err) } @@ -472,16 +458,19 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p // in the current run with the latest score; // * bindings that should be deleted, i.e., mark a binding as deleting if its target cluster is no // longer picked in the current run. + // * bindings that require no change; such bindings are already created/updated with the latest + // scheduling policy. // // Fields in the returned bindings are fulfilled and/or refreshed as applicable. - toCreate, toUpdate, toDelete, err := crossReferencePickedCustersAndBindings(crpName, policy, picked, active, creating) + klog.V(2).InfoS("Cross-referencing bindings with picked clusters", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) + toCreate, toUpdate, toDelete, toRemain, err := crossReferencePickedCustersAndBindings(crpName, policy, picked, active, creating, obsolete) // Manipulate bindings accordingly. klog.V(2).InfoS("Creating, updating, and/or deleting bindings", "policy", schedulingPolicySnapshotRef) // Create new bindings. if err := f.createBindings(ctx, toCreate); err != nil { - klog.ErrorS(err, errorMessage, schedulingPolicySnapshotRef) + klog.ErrorS(err, "Failed to create new bindings", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) return ctrl.Result{}, err } @@ -490,7 +479,7 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p // Note that a race condition may arise here, when a rollout controller attempts to update bindings // at the same time with the scheduler. An error induced requeue will happen in this case. if err := f.updateBindings(ctx, toUpdate); err != nil { - klog.ErrorS(err, errorMessage, schedulingPolicySnapshotRef) + klog.ErrorS(err, "Failed to update old bindings", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) return ctrl.Result{}, err } @@ -499,7 +488,25 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p // Note that a race condition may arise here, when a rollout controller attempts to update bindings // at the same time with the scheduler. An error induced requeue will happen in this case. if err := f.markAsDeletingFor(ctx, toDelete); err != nil { - klog.ErrorS(err, errorMessage, schedulingPolicySnapshotRef) + klog.ErrorS(err, "failed to mark bindings as deleting", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) + return ctrl.Result{}, err + } + + // Update the policy snapshot status to reflect the latest decisions made in this scheduling cycle. + + // Refresh scheduling decisions. + refreshedSchedulingDecisions := newSchedulingDecisionsFrom(filtered, toCreate, toUpdate, toRemain) + // Prepare new scheduling condition. + // + // In the case of downscaling, the scheduler considers the policy to be fully scheduled. + newSchedulingCondition := fullyScheduledCondition(policy) + + // Update the policy snapshot status. + // + // Note that the op would fail if the policy snapshot is not the latest, so that consistency is + // preserved. + if err := f.updatePolicySnapshotStatus(ctx, policy, refreshedSchedulingDecisions, newSchedulingCondition); err != nil { + klog.ErrorS(err, "failed to update policy snapshot status (scheduling cycle completed)", "schedulingPolicySnapshot", schedulingPolicySnapshotRef) return ctrl.Result{}, err } @@ -518,6 +525,8 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p RequeueAfter: requeueDelay, }, nil } + + // The scheduling cycle has completed. return ctrl.Result{}, nil } @@ -599,15 +608,24 @@ func (s sortByCreationTimestampBindings) Less(i, j int) bool { // To minimize interruptions, the scheduler picks creating bindings first (in any order); if there are still more // bindings to trim, the scheduler prefers ones with smaller CreationTimestamps. func (f *framework) downscale(ctx context.Context, active, creating []*fleetv1beta1.ClusterResourceBinding, count int) (updatedActive, updatedCreating []*fleetv1beta1.ClusterResourceBinding, err error) { - if count <= len(creating) { - // Trimming creating bindings would suffice. - bindingsToDelete := creating[:count] - return active, creating[count:], f.markAsDeletingFor(ctx, bindingsToDelete) + if count == 0 { + // Skip if the downscale count is zero. + return active, creating, nil } - // Trim all creating bindings. + // Trim creating bindings. bindingsToDelete := make([]*fleetv1beta1.ClusterResourceBinding, 0, count) - bindingsToDelete = append(bindingsToDelete, creating...) + for i := 0; i < len(creating) && i < count; i++ { + binding := creating[i] + bindingsToDelete = append(bindingsToDelete, binding) + } + + if len(bindingsToDelete) == count { + // Trimming creating bindings alone would suffice. + return active, creating[count:], f.markAsDeletingFor(ctx, bindingsToDelete) + } + + // Trimming obsolete and creating bindings alone is not enough, move on to active bindings. // Sort the bindings by their CreationTimestamps. sorted := sortByCreationTimestampBindings(active) @@ -626,11 +644,21 @@ func (f *framework) downscale(ctx context.Context, active, creating []*fleetv1be // updatePolicySnapshotStatus updates the status of a policy snapshot, setting new scheduling decisions // and condition on the object. -func (f *framework) updatePolicySnapshotStatus(ctx context.Context, policy *fleetv1beta1.ClusterPolicySnapshot, decisions []fleetv1beta1.ClusterDecision, condition metav1.Condition) error { +// +// This function will perform a sameness (no change) check, and skip the update if there is no change +// in the decisions and the condition. +func (f *framework) updatePolicySnapshotStatus(ctx context.Context, policy *fleetv1beta1.ClusterPolicySnapshot, newDecisions []fleetv1beta1.ClusterDecision, newCondition metav1.Condition) error { errorFormat := "failed to update policy snapshot status: %w" - policy.Status.ClusterDecisions = decisions - meta.SetStatusCondition(&policy.Status.Conditions, condition) + currentDecisions := policy.Status.ClusterDecisions + currentCondition := meta.FindStatusCondition(policy.Status.Conditions, string(fleetv1beta1.PolicySnapshotScheduled)) + if equalDecisions(currentDecisions, newDecisions) && condition.EqualCondition(currentCondition, &newCondition) { + // Skip if there is no change in decisions and conditions. + return nil + } + + policy.Status.ClusterDecisions = newDecisions + meta.SetStatusCondition(&policy.Status.Conditions, newCondition) if err := f.client.Status().Update(ctx, policy, &client.UpdateOptions{}); err != nil { return controller.NewAPIServerError(fmt.Errorf(errorFormat, err)) } diff --git a/pkg/scheduler/framework/framework_test.go b/pkg/scheduler/framework/framework_test.go index bded0b764..ff018bac3 100644 --- a/pkg/scheduler/framework/framework_test.go +++ b/pkg/scheduler/framework/framework_test.go @@ -30,6 +30,7 @@ import ( const ( CRPName = "test-placement" policyName = "test-policy" + altPolicyName = "another-test-policy" bindingName = "test-binding" altBindingName = "another-test-binding" clusterName = "bravelion" @@ -142,6 +143,12 @@ func TestCollectBindings(t *testing.T) { func TestClassifyBindings(t *testing.T) { now := metav1.Now() + policy := &fleetv1beta1.ClusterPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + }, + } + clusterName1 := "cluster-1" clusterName2 := "cluster-2" clusterName3 := "cluster-3" @@ -202,8 +209,9 @@ func TestClassifyBindings(t *testing.T) { Name: "binding-4", }, Spec: fleetv1beta1.ResourceBindingSpec{ - State: fleetv1beta1.BindingStateActive, - TargetCluster: clusterName3, + State: fleetv1beta1.BindingStateActive, + TargetCluster: clusterName3, + PolicySnapshotName: altPolicyName, }, } assocaitedWithDisappearedClusterBinding := fleetv1beta1.ClusterResourceBinding{ @@ -211,26 +219,39 @@ func TestClassifyBindings(t *testing.T) { Name: "binding-5", }, Spec: fleetv1beta1.ResourceBindingSpec{ - State: fleetv1beta1.BindingStateCreating, - TargetCluster: clusterName4, + State: fleetv1beta1.BindingStateCreating, + TargetCluster: clusterName4, + PolicySnapshotName: policyName, }, } - activeBinding := fleetv1beta1.ClusterResourceBinding{ + obsoleteBinding := fleetv1beta1.ClusterResourceBinding{ ObjectMeta: metav1.ObjectMeta{ Name: "binding-6", }, Spec: fleetv1beta1.ResourceBindingSpec{ - State: fleetv1beta1.BindingStateActive, - TargetCluster: clusterName1, + State: fleetv1beta1.BindingStateActive, + TargetCluster: clusterName1, + PolicySnapshotName: altPolicyName, }, } - creatingBinding := fleetv1beta1.ClusterResourceBinding{ + activeBinding := fleetv1beta1.ClusterResourceBinding{ ObjectMeta: metav1.ObjectMeta{ Name: "binding-7", }, Spec: fleetv1beta1.ResourceBindingSpec{ - State: fleetv1beta1.BindingStateCreating, - TargetCluster: clusterName2, + State: fleetv1beta1.BindingStateActive, + TargetCluster: clusterName1, + PolicySnapshotName: policyName, + }, + } + creatingBinding := fleetv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "binding-8", + }, + Spec: fleetv1beta1.ResourceBindingSpec{ + State: fleetv1beta1.BindingStateCreating, + TargetCluster: clusterName2, + PolicySnapshotName: policyName, }, } @@ -240,15 +261,17 @@ func TestClassifyBindings(t *testing.T) { deletingBinding, associatedWithLeavingClusterBinding, assocaitedWithDisappearedClusterBinding, + obsoleteBinding, activeBinding, creatingBinding, } wantActive := []*fleetv1beta1.ClusterResourceBinding{&activeBinding} wantCreating := []*fleetv1beta1.ClusterResourceBinding{&creatingBinding} + wantObsolete := []*fleetv1beta1.ClusterResourceBinding{&obsoleteBinding} wantDeleted := []*fleetv1beta1.ClusterResourceBinding{&markedForDeletionWithFinalizerBinding} - wantObsolete := []*fleetv1beta1.ClusterResourceBinding{&associatedWithLeavingClusterBinding, &assocaitedWithDisappearedClusterBinding} + wantDangling := []*fleetv1beta1.ClusterResourceBinding{&associatedWithLeavingClusterBinding, &assocaitedWithDisappearedClusterBinding} - active, creating, obsolete, deleted := classifyBindings(bindings, clusters) + active, creating, obsolete, dangling, deleted := classifyBindings(policy, bindings, clusters) if !cmp.Equal(active, wantActive) { t.Errorf("classifyBindings() active = %v, want %v", active, wantActive) } @@ -261,6 +284,10 @@ func TestClassifyBindings(t *testing.T) { t.Errorf("classifyBindings() obsolete = %v, want %v", obsolete, wantObsolete) } + if !cmp.Equal(dangling, wantDangling) { + t.Errorf("classifyBindings() dangling = %v, want %v", dangling, wantDangling) + } + if !cmp.Equal(deleted, wantDeleted) { t.Errorf("classifyBindings() deleted = %v, want %v", deleted, wantDeleted) } @@ -307,6 +334,7 @@ func TestShouldDownscale(t *testing.T) { policy *fleetv1beta1.ClusterPolicySnapshot desired int present int + obsolete int wantAct bool wantCount int }{ @@ -355,11 +383,29 @@ func TestShouldDownscale(t *testing.T) { wantAct: true, wantCount: 1, }, + { + name: "should downscale (obsolete bindings)", + policy: &fleetv1beta1.ClusterPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + }, + Spec: fleetv1beta1.SchedulingPolicySnapshotSpec{ + Policy: &fleetv1beta1.PlacementPolicy{ + PlacementType: fleetv1beta1.PickNPlacementType, + }, + }, + }, + desired: 1, + present: 1, + obsolete: 1, + wantAct: true, + wantCount: 0, + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - act, count := shouldDownscale(tc.policy, tc.desired, tc.present) + act, count := shouldDownscale(tc.policy, tc.desired, tc.present, tc.obsolete) if act != tc.wantAct || count != tc.wantCount { t.Fatalf("shouldDownscale() = %v, %v, want %v, %v", act, count, tc.wantAct, tc.wantCount) } diff --git a/pkg/scheduler/framework/frameworkutils.go b/pkg/scheduler/framework/frameworkutils.go index b3194e881..d4a5707a9 100644 --- a/pkg/scheduler/framework/frameworkutils.go +++ b/pkg/scheduler/framework/frameworkutils.go @@ -25,15 +25,19 @@ import ( // have been cleared for processing by the dispatcher; and // - creating bindings, i.e., bindings that have been associated with a normally operating cluster, // but have not yet been cleared for processing by the dispatcher; and -// - obsolete bindings, i.e., bindings that are no longer associated with a normally operating -// cluster, but have not yet been marked as deleting by the scheduler; and +// - dangling bindings, i.e., bindings that are associated with a cluster that is no longer in +// a normally operating state (the cluster has left the fleet, or is in the state of leaving), +// yet has not been marked as deleting by the scheduler; and +// - obsolete bindings, i.e., bindings that are no longer associated with the latest scheduling +// policy; and // - deleted bindings, i.e, bindings that are marked for deletion in the API server, but have not // yet been marked as deleting by the scheduler. -func classifyBindings(bindings []fleetv1beta1.ClusterResourceBinding, clusters []fleetv1beta1.MemberCluster) (active, creating, obsolete, deleted []*fleetv1beta1.ClusterResourceBinding) { +func classifyBindings(policy *fleetv1beta1.ClusterPolicySnapshot, bindings []fleetv1beta1.ClusterResourceBinding, clusters []fleetv1beta1.MemberCluster) (active, creating, obsolete, dangling, deleted []*fleetv1beta1.ClusterResourceBinding) { // Pre-allocate arrays. active = make([]*fleetv1beta1.ClusterResourceBinding, 0, len(bindings)) creating = make([]*fleetv1beta1.ClusterResourceBinding, 0, len(bindings)) obsolete = make([]*fleetv1beta1.ClusterResourceBinding, 0, len(bindings)) + dangling = make([]*fleetv1beta1.ClusterResourceBinding, 0, len(bindings)) deleted = make([]*fleetv1beta1.ClusterResourceBinding, 0, len(bindings)) // Build a map for clusters for quick loopup. @@ -47,9 +51,12 @@ func classifyBindings(bindings []fleetv1beta1.ClusterResourceBinding, clusters [ targetCluster, isTargetClusterPresent := clusterMap[binding.Spec.TargetCluster] switch { - case binding.DeletionTimestamp != nil && controllerutil.ContainsFinalizer(&binding, fleetv1beta1.SchedulerCleanupFinalizer): + case binding.DeletionTimestamp != nil: // Check if the binding has been deleted, but still has the scheduler cleanup finalizer in presence. - deleted = append(deleted, &binding) + if controllerutil.ContainsFinalizer(&binding, fleetv1beta1.SchedulerCleanupFinalizer) { + deleted = append(deleted, &binding) + } + // Ignore any binding that has been deleted, and has no scheduler cleanup finalizer in presence. case binding.Spec.State == fleetv1beta1.BindingStateDeleting: // Ignore any binding that is of the deleting state. case !isTargetClusterPresent || targetCluster.Spec.State == fleetv1beta1.ClusterStateLeave: @@ -58,6 +65,9 @@ func classifyBindings(bindings []fleetv1beta1.ClusterResourceBinding, clusters [ // // Note that this check is solely for the purpose of detecting a situation where bindings are stranded // on a leaving/left cluster; it does not perform any binding association eligibility check for the cluster. + dangling = append(dangling, &binding) + case binding.Spec.PolicySnapshotName != policy.Name: + // The binding is in the creating or active state, but is no longer associated with the latest scheduling policy snapshot. obsolete = append(obsolete, &binding) case binding.Spec.State == fleetv1beta1.BindingStateCreating: // Check if the binding is of the creating state. @@ -68,14 +78,20 @@ func classifyBindings(bindings []fleetv1beta1.ClusterResourceBinding, clusters [ } } - return active, creating, obsolete, deleted + return active, creating, obsolete, dangling, deleted } -// shouldDownscale checks if the scheduler needs to perform some downscaling, and (if so) how many bindings +// shouldDownscale checks if the scheduler needs to perform some downscaling, and (if so) how many active or creating bindings // it should remove. -func shouldDownscale(policy *fleetv1beta1.ClusterPolicySnapshot, desired, present int) (act bool, count int) { - if policy.Spec.Policy.PlacementType == fleetv1beta1.PickNPlacementType && desired < present { - return true, present - desired +func shouldDownscale(policy *fleetv1beta1.ClusterPolicySnapshot, desired, present, obsolete int) (act bool, count int) { + if policy.Spec.Policy.PlacementType == fleetv1beta1.PickNPlacementType && desired <= present { + // Downscale only applies to CRPs of the Pick N placement type; and it only applies when the number of + // clusters requested by the user is less than the number of currently active + creating bindings combined; + // or there are the right number of active + creating bindings, yet some obsolete bindings still linger + // in the system. + if count := present - desired + obsolete; count > 0 { + return true, present - desired + } } return false, 0 } @@ -154,22 +170,6 @@ func equalDecisions(current, desired []fleetv1beta1.ClusterDecision) bool { return len(current) == len(desired) } -// notFullyScheduledCondition returns a condition for not fully scheduled policy snapshot. -func notFullyScheduledCondition(policy *fleetv1beta1.ClusterPolicySnapshot, desiredCount int) metav1.Condition { - message := notFullyScheduledMessage - if policy.Spec.Policy.PlacementType == fleetv1beta1.PickNPlacementType { - message = fmt.Sprintf("%s: expected count %d, current count %d", message, policy.Spec.Policy.NumberOfClusters, desiredCount) - } - return metav1.Condition{ - Type: string(fleetv1beta1.PolicySnapshotScheduled), - Status: metav1.ConditionFalse, - ObservedGeneration: policy.Generation, - LastTransitionTime: metav1.Now(), - Reason: notFullyScheduledReason, - Message: message, - } -} - // calcNumOfClustersToSelect calculates the number of clusters to select in a scheduling run; it // essentially returns the minimum among the desired number of clusters, the batch size limit, // and the number of scored clusters. @@ -265,19 +265,22 @@ func pickTopNScoredClusters(scoredClusters ScoredClusters, N int) ScoredClusters // // - bindings that should be created, i.e., create a binding for every cluster that is newly picked // and does not have a binding associated with; -// - bindings that should be updated, i.e., associate a binding whose target cluster is picked again -// in the current run with the latest scheduling policy snapshot (if applicalbe); +// - bindings that should be updated, i.e., associate a binding, whose target cluster is picked again +// in the current run, with the latest scheduling policy snapshot (if applicable); // - bindings that should be deleted, i.e., mark a binding as deleting if its target cluster is no // longer picked in the current run. +// - bindings that require no change; such bindings are created in accordance with the latest +// scheduling policy already. // // Note that this function will return bindings with all fields fulfilled/refreshed, as applicable. -func crossReferencePickedCustersAndBindings(crpName string, policy *fleetv1beta1.ClusterPolicySnapshot, picked ScoredClusters, existing ...[]*fleetv1beta1.ClusterResourceBinding) (toCreate, toUpdate, toDelete []*fleetv1beta1.ClusterResourceBinding, err error) { +func crossReferencePickedCustersAndBindings(crpName string, policy *fleetv1beta1.ClusterPolicySnapshot, picked ScoredClusters, existing ...[]*fleetv1beta1.ClusterResourceBinding) (toCreate, toUpdate, toDelete, toRemain []*fleetv1beta1.ClusterResourceBinding, err error) { errorFormat := "failed to cross reference picked clusters and existing bindings: %w" // Pre-allocate with a reasonable capacity. toCreate = make([]*fleetv1beta1.ClusterResourceBinding, 0, len(picked)) toUpdate = make([]*fleetv1beta1.ClusterResourceBinding, 0, 20) toDelete = make([]*fleetv1beta1.ClusterResourceBinding, 0, 20) + toRemain = make([]*fleetv1beta1.ClusterResourceBinding, 0, 20) // Build a map of picked scored clusters for quick lookup. pickedMap := make(map[string]*ScoredCluster) @@ -290,10 +293,19 @@ func crossReferencePickedCustersAndBindings(crpName string, policy *fleetv1beta1 for _, bindingSet := range existing { for _, binding := range bindingSet { + if binding.Spec.PolicySnapshotName == policy.Name { + // The binding is already associated with the latest scheduling policy; no change + // is necessary. + toRemain = append(toRemain, binding) + continue + } + scored, ok := pickedMap[binding.Spec.TargetCluster] checked[binding.Spec.TargetCluster] = true + if ok { - // The binding's target cluster is picked again in the current run. + // The binding's target cluster is picked again in the current run; yet the binding + // is originally created/updated in accordance with an out-of-date scheduling policy. // Update the binding so that it is associated with the latest score. affinityScore := int32(scored.Score.AffinityScore) @@ -303,11 +315,12 @@ func crossReferencePickedCustersAndBindings(crpName string, policy *fleetv1beta1 TopologySpreadScore: &topologySpreadScore, } + // Update the binding so that it is associated with the lastest scheduling policy. + binding.Spec.PolicySnapshotName = policy.Name + // Add the binding to the toUpdate list. toUpdate = append(toUpdate, binding) } else { - // The binding's target cluster is not picked in the current run; add the binding to - // the toDelete list. toDelete = append(toDelete, binding) } } @@ -319,7 +332,7 @@ func crossReferencePickedCustersAndBindings(crpName string, policy *fleetv1beta1 name, err := uniquename.ClusterResourceBindingUniqueName(crpName, scored.Cluster.Name) if err != nil { // Cannot get a unique name for the binding; normally this should never happen. - return nil, nil, nil, controller.NewUnexpectedBehaviorError(fmt.Errorf(errorFormat, err)) + return nil, nil, nil, nil, controller.NewUnexpectedBehaviorError(fmt.Errorf(errorFormat, err)) } affinityScore := int32(scored.Score.AffinityScore) topologySpreadScore := int32(scored.Score.TopologySpreadScore) @@ -338,7 +351,8 @@ func crossReferencePickedCustersAndBindings(crpName string, policy *fleetv1beta1 State: fleetv1beta1.BindingStateCreating, // Leave the associated resource snapshot name empty; it is up to another controller // to fulfill this field. - TargetCluster: scored.Cluster.Name, + PolicySnapshotName: policy.Name, + TargetCluster: scored.Cluster.Name, ClusterDecision: fleetv1beta1.ClusterDecision{ ClusterName: scored.Cluster.Name, Selected: true, @@ -353,7 +367,35 @@ func crossReferencePickedCustersAndBindings(crpName string, policy *fleetv1beta1 } } - return toCreate, toUpdate, toDelete, nil + return toCreate, toUpdate, toDelete, toRemain, nil +} + +// newSchedulingDecisionsFrom returns a list of scheduling decisions, based on the newly manipulated list of +// bindings and (if applicable) a list of filtered clusters. +func newSchedulingDecisionsFrom(filtered []*filteredClusterWithStatus, existing ...[]*fleetv1beta1.ClusterResourceBinding) []fleetv1beta1.ClusterDecision { + // Pre-allocate with a reasonable capacity. + newDecisions := make([]fleetv1beta1.ClusterDecision, 0, maxClusterDecisionCount) + + // Build new scheduling decisions. + for _, bindingSet := range existing { + for _, binding := range bindingSet { + newDecisions = append(newDecisions, binding.Spec.ClusterDecision) + } + } + + // Move some decisions from unbound clusters, if there are still enough room. + if diff := maxClusterDecisionCount - len(newDecisions); diff > 0 { + for i := 0; i > diff && i > len(filtered); i++ { + clusterWithStatus := filtered[i] + newDecisions = append(newDecisions, fleetv1beta1.ClusterDecision{ + ClusterName: clusterWithStatus.cluster.Name, + Selected: false, + Reason: clusterWithStatus.status.String(), + }) + } + } + + return newDecisions } // shouldRequeue determines if the scheduler should start another scheduling cycle on the same