From 5384bda3c3a657dda04db4ef7aac555f9f13c347 Mon Sep 17 00:00:00 2001 From: michaelawyu Date: Mon, 3 Jul 2023 19:52:59 +0800 Subject: [PATCH] Added more scheduler logic --- apis/placement/v1beta1/binding_types.go | 8 +- .../v1beta1/clusterresourceplacement_types.go | 4 + pkg/scheduler/framework/framework.go | 79 ++-- pkg/scheduler/framework/framework_test.go | 77 +--- pkg/scheduler/framework/frameworkutils.go | 15 +- pkg/scheduler/queue/queue.go | 32 ++ pkg/scheduler/scheduler.go | 338 ++++++++++++++++++ 7 files changed, 413 insertions(+), 140 deletions(-) diff --git a/apis/placement/v1beta1/binding_types.go b/apis/placement/v1beta1/binding_types.go index 6ac51c392..6f63ac5de 100644 --- a/apis/placement/v1beta1/binding_types.go +++ b/apis/placement/v1beta1/binding_types.go @@ -10,12 +10,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -const ( - // SchedulerCleanupFinalizer is a finalizer added by the scheduler to all bindings, to make sure - // that the scheduler can react to binding deletions if necessary. - SchedulerCleanupFinalizer = fleetPrefix + "scheduler-cleanup" -) - // +kubebuilder:object:root=true // +kubebuilder:resource:scope=Cluster,categories={fleet},shortName=rb // +kubebuilder:subresource:status @@ -48,7 +42,7 @@ type ResourceBindingSpec struct { // it points to the name of the leading snapshot of the index group. ResourceSnapshotName string `json:"resourceSnapshotName"` - // PolicySnapshtName is the name of the scheduling policy snapshot that this resource binding + // PolicySnapshotName is the name of the scheduling policy snapshot that this resource binding // points to; more specifically, the scheduler creates this bindings in accordance with this // scheduling policy snapshot. PolicySnapshotName string `json:"policySnapshotName"` diff --git a/apis/placement/v1beta1/clusterresourceplacement_types.go b/apis/placement/v1beta1/clusterresourceplacement_types.go index c028d8eaa..28c87ef93 100644 --- a/apis/placement/v1beta1/clusterresourceplacement_types.go +++ b/apis/placement/v1beta1/clusterresourceplacement_types.go @@ -11,6 +11,10 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" ) +const ( + SchedulerCRPCleanupFinalizer = fleetPrefix + "scheduler-crp-cleanup" +) + // +genclient // +genclient:nonNamespaced // +kubebuilder:object:root=true diff --git a/pkg/scheduler/framework/framework.go b/pkg/scheduler/framework/framework.go index d2308698b..16f834f4a 100644 --- a/pkg/scheduler/framework/framework.go +++ b/pkg/scheduler/framework/framework.go @@ -20,7 +20,6 @@ import ( "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" "go.goms.io/fleet/pkg/scheduler/framework/parallelizer" @@ -239,20 +238,11 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p // * dangling bindings, i.e., bindings that are associated with a cluster that is no longer // in a normally operating state (the cluster has left the fleet, or is in the state of leaving), // yet has not been marked as deleting by the scheduler; and - // * deleted bindings, i.e, bindings that are marked for deletion in the API server, but have not - // yet been marked as deleting by the scheduler. // // Note that bindings marked as deleting are ignored by the scheduler (unless they are already marked // for deletion and still has the scheduler cleanup finalizer), as they // are irrelevant to the scheduling cycle. - active, creating, obsolete, dangling, deleted := classifyBindings(policy, bindings, clusters) - - // If a binding has been marked for deletion yet still has the scheduler cleanup finalizer, - // remove the finalizer from it to clear it for GC. - if err := f.removeSchedulerCleanupFinalizerFrom(ctx, deleted); err != nil { - klog.ErrorS(err, "failed to remove scheduler cleanup finalizer from deleted bindings", "schedulingPolicySnapshot", policyRef) - return ctrl.Result{}, err - } + active, creating, obsolete, dangling := classifyBindings(policy, bindings, clusters) // Mark all dangling bindings as deleting. if err := f.markAsDeletingFor(ctx, dangling); err != nil { @@ -269,10 +259,13 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p switch policy.Spec.Policy.PlacementType { case fleetv1beta1.PickAllPlacementType: - return f.runSchedulingCycleForPickAllPlacementType(ctx, state, crpName, policy, policyRef, clusters, active, creating, obsolete) + // Run the scheduling cycle for policy of the PickAll placement type. + return f.runSchedulingCycleForPickAllPlacementType(ctx, state, crpName, policy, clusters, active, creating, obsolete) case fleetv1beta1.PickNPlacementType: - return f.runSchedulingCycleForPickNPlacementType(ctx, state, crpName, policy, policyRef, clusters, active, creating, obsolete) + // Run the scheduling cycle for policy of the PickN placement type. + return f.runSchedulingCycleForPickNPlacementType(ctx, state, crpName, policy, clusters, active, creating, obsolete) default: + // This normally should never occur. klog.ErrorS(err, "unknown placement type", "schedulingPolicySnapshot", policyRef) return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err) } @@ -283,10 +276,11 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p func (f *framework) updatePolicySnapshotStatusFrom( ctx context.Context, policy *fleetv1beta1.ClusterPolicySnapshot, - policyRef klog.ObjectRef, filtered []*filteredClusterWithStatus, existing ...[]*fleetv1beta1.ClusterResourceBinding, ) error { + policyRef := klog.KObj(policy) + // Prepare new scheduling decisions. newDecisions := newSchedulingDecisionsFrom(f.maxClusterDecisionCount, filtered, existing...) // Prepare new scheduling condition. @@ -309,7 +303,9 @@ func (f *framework) updatePolicySnapshotStatusFrom( } // manipulateBindings creates, updates, and deletes bindings. -func (f *framework) manipulateBindings(ctx context.Context, policyRef klog.ObjectRef, toCreate, toUpdate, toDelete []*fleetv1beta1.ClusterResourceBinding) error { +func (f *framework) manipulateBindings(ctx context.Context, policy *fleetv1beta1.ClusterPolicySnapshot, toCreate, toUpdate, toDelete []*fleetv1beta1.ClusterResourceBinding) error { + policyRef := klog.KObj(policy) + // Create new bindings. if err := f.createBindings(ctx, toCreate); err != nil { klog.ErrorS(err, "failed to create new bindings", "schedulingPolicySnapshot", policyRef) @@ -318,8 +314,9 @@ func (f *framework) manipulateBindings(ctx context.Context, policyRef klog.Objec // Update existing bindings. // - // Note that a race condition may arise here, when a rollout controller attempts to update bindings - // at the same time with the scheduler. An error induced requeue will happen in this case. + // A race condition may arise here, when a rollout controller attempts to update bindings + // at the same time with the scheduler, e.g., marking a binding as active (from the creating + // state) if err := f.updateBindings(ctx, toUpdate); err != nil { klog.ErrorS(err, "failed to update old bindings", "schedulingPolicySnapshot", policyRef) return err @@ -350,9 +347,10 @@ func (f *framework) runAllPluginsForPickAllPlacementType( ctx context.Context, state *CycleState, policy *fleetv1beta1.ClusterPolicySnapshot, - policyRef klog.ObjectRef, clusters []fleetv1beta1.MemberCluster, ) (scored ScoredClusters, filtered []*filteredClusterWithStatus, err error) { + policyRef := klog.KObj(policy) + // Run pre-filter plugins. // // Each plugin can: @@ -397,16 +395,17 @@ func (f *framework) runSchedulingCycleForPickAllPlacementType( state *CycleState, crpName string, policy *fleetv1beta1.ClusterPolicySnapshot, - policyRef klog.ObjectRef, clusters []fleetv1beta1.MemberCluster, active, creating, obsolete []*fleetv1beta1.ClusterResourceBinding, ) (result ctrl.Result, err error) { + policyRef := klog.KObj(policy) + // The scheduler always needs to take action when processing scheduling policies of the PickAll // placement type; enter the actual scheduling stages. klog.V(2).InfoS("Scheduling is needed; entering scheduling stages", "schedulingPolicySnapshot", policyRef) // Run all plugins needed. - scored, filtered, err := f.runAllPluginsForPickAllPlacementType(ctx, state, policy, policyRef, clusters) + scored, filtered, err := f.runAllPluginsForPickAllPlacementType(ctx, state, policy, clusters) if err != nil { klog.ErrorS(err, "failed to run all plugins (pickAll placement type)", "schedulingPolicySnapshot", policyRef) return ctrl.Result{}, err @@ -437,14 +436,14 @@ func (f *framework) runSchedulingCycleForPickAllPlacementType( // Manipulate bindings accordingly. klog.V(2).InfoS("Creating, updating, and/or deleting bindings", "policy", policyRef) - if err := f.manipulateBindings(ctx, policyRef, toCreate, toUpdate, toDelete); err != nil { + if err := f.manipulateBindings(ctx, policy, toCreate, toUpdate, toDelete); err != nil { klog.ErrorS(err, "failed to manipulate bindings", "schedulingPolicySnapshot", policyRef) return ctrl.Result{}, err } // Update policy snapshot status with the latest scheduling decisions and condition. klog.V(2).InfoS("Updating policy snapshot status", "schedulingPolicySnapshot", policyRef) - if err := f.updatePolicySnapshotStatusFrom(ctx, policy, policyRef, filtered, toCreate, toUpdate, active, creating); err != nil { + if err := f.updatePolicySnapshotStatusFrom(ctx, policy, filtered, toCreate, toUpdate, active, creating); err != nil { klog.ErrorS(err, "failed to update latest scheduling decisions and condition", "schedulingPolicySnapshot", policyRef) return ctrl.Result{}, err } @@ -460,11 +459,12 @@ func (f *framework) runAllPluginsForPickNPlacementType( ctx context.Context, state *CycleState, policy *fleetv1beta1.ClusterPolicySnapshot, - policyRef klog.ObjectRef, numOfClusters int, activeOrCreatingBindings int, clusters []fleetv1beta1.MemberCluster, ) (scored ScoredClusters, filtered []*filteredClusterWithStatus, err error) { + policyRef := klog.KObj(policy) + // Calculate the batch size. // // Note that obsolete bindings are not counted. @@ -554,10 +554,11 @@ func (f *framework) runSchedulingCycleForPickNPlacementType( state *CycleState, crpName string, policy *fleetv1beta1.ClusterPolicySnapshot, - policyRef klog.ObjectRef, clusters []fleetv1beta1.MemberCluster, active, creating, obsolete []*fleetv1beta1.ClusterResourceBinding, ) (result ctrl.Result, err error) { + policyRef := klog.KObj(policy) + // Retrieve the desired number of clusters from the policy. // // Note that for scheduling policies of the PickN type, this annotation is expected to be present. @@ -608,7 +609,7 @@ func (f *framework) runSchedulingCycleForPickNPlacementType( // Note that since there is no reliable way to determine the validity of old decisions added // to the policy snapshot status, we will only update the status with the known facts, i.e., // the clusters that are currently selected. - if err := f.updatePolicySnapshotStatusFrom(ctx, policy, policyRef, nil, active, creating); err != nil { + if err := f.updatePolicySnapshotStatusFrom(ctx, policy, nil, active, creating); err != nil { klog.ErrorS(err, "failed to update latest scheduling decisions and condition when downscaling", "schedulingPolicySnapshot", policyRef) return ctrl.Result{}, err } @@ -629,7 +630,7 @@ func (f *framework) runSchedulingCycleForPickNPlacementType( // Note that since there is no reliable way to determine the validity of old decisions added // to the policy snapshot status, we will only update the status with the known facts, i.e., // the clusters that are currently selected. - if err := f.updatePolicySnapshotStatusFrom(ctx, policy, policyRef, nil, active, creating); err != nil { + if err := f.updatePolicySnapshotStatusFrom(ctx, policy, nil, active, creating); err != nil { klog.ErrorS(err, "failed to update latest scheduling decisions and condition when no scheduling run is needed", "schedulingPolicySnapshot", policyRef) return ctrl.Result{}, err } @@ -642,7 +643,7 @@ func (f *framework) runSchedulingCycleForPickNPlacementType( klog.V(2).InfoS("Scheduling is needed; entering scheduling stages", "schedulingPolicySnapshot", policyRef) // Run all the plugins. - scored, filtered, err := f.runAllPluginsForPickNPlacementType(ctx, state, policy, policyRef, numOfClusters, len(active)+len(creating), clusters) + scored, filtered, err := f.runAllPluginsForPickNPlacementType(ctx, state, policy, numOfClusters, len(active)+len(creating), clusters) if err != nil { klog.ErrorS(err, "failed to run all plugins", "schedulingPolicySnapshot", policyRef) return ctrl.Result{}, err @@ -688,14 +689,14 @@ func (f *framework) runSchedulingCycleForPickNPlacementType( // Manipulate bindings accordingly. klog.V(2).InfoS("Creating, updating, and/or deleting bindings", "policy", policyRef) - if err := f.manipulateBindings(ctx, policyRef, toCreate, toUpdate, toDelete); err != nil { + if err := f.manipulateBindings(ctx, policy, toCreate, toUpdate, toDelete); err != nil { klog.ErrorS(err, "failed to manipulate bindings", "schedulingPolicySnapshot", policyRef) return ctrl.Result{}, err } // Update policy snapshot status with the latest scheduling decisions and condition. klog.V(2).InfoS("Updating policy snapshot status", "schedulingPolicySnapshot", policyRef) - if err := f.updatePolicySnapshotStatusFrom(ctx, policy, policyRef, filtered, toCreate, toUpdate, active, creating); err != nil { + if err := f.updatePolicySnapshotStatusFrom(ctx, policy, filtered, toCreate, toUpdate, active, creating); err != nil { klog.ErrorS(err, "failed to update latest scheduling decisions and condition", "schedulingPolicySnapshot", policyRef) return ctrl.Result{}, err } @@ -744,28 +745,14 @@ func (f *framework) collectBindings(ctx context.Context, crpName string) ([]flee return bindingList.Items, nil } -// removeSchedulerCleanupFinalizerFrom removes the scheduler cleanup finalizer from a list of bindings. -func (f *framework) removeSchedulerCleanupFinalizerFrom(ctx context.Context, bindings []*fleetv1beta1.ClusterResourceBinding) error { - errorFormat := "failed to remove scheduler finalizer from binding %s: %w" - - for _, binding := range bindings { - controllerutil.RemoveFinalizer(binding, fleetv1beta1.SchedulerCleanupFinalizer) - if err := f.client.Update(ctx, binding, &client.UpdateOptions{}); err != nil { - return controller.NewAPIServerError(fmt.Errorf(errorFormat, binding.Name, err)) - } - } - return nil -} - // markAsDeletingFor marks a list of bindings as deleting. func (f *framework) markAsDeletingFor(ctx context.Context, bindings []*fleetv1beta1.ClusterResourceBinding) error { errorFormat := "failed to mark binding %s as deleting: %w" for _, binding := range bindings { - // Deleting is a terminal state for a binding; since there is no point of return, and the - // scheduler has acknowledged its fate at this moment, it is safe for the scheduler to remove - // the scheduler cleanup finalizer from any binding that is of the deleting state. - controllerutil.RemoveFinalizer(binding, fleetv1beta1.SchedulerCleanupFinalizer) + // Note that Deleting is a terminal state for a binding; since there is no point of return, and the + // scheduler has acknowledged its fate at this moment, any binding marked as deletion will be + // disregarded by the scheduler from this point onwards. binding.Spec.State = fleetv1beta1.BindingStateDeleting if err := f.client.Update(ctx, binding, &client.UpdateOptions{}); err != nil { return controller.NewAPIServerError(fmt.Errorf(errorFormat, binding.Name, err)) diff --git a/pkg/scheduler/framework/framework_test.go b/pkg/scheduler/framework/framework_test.go index 02eb21aab..d511cdf9e 100644 --- a/pkg/scheduler/framework/framework_test.go +++ b/pkg/scheduler/framework/framework_test.go @@ -21,9 +21,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client/fake" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" "go.goms.io/fleet/pkg/scheduler/framework/parallelizer" @@ -145,8 +143,6 @@ func TestCollectBindings(t *testing.T) { } func TestClassifyBindings(t *testing.T) { - now := metav1.Now() - policy := &fleetv1beta1.ClusterPolicySnapshot{ ObjectMeta: metav1.ObjectMeta{ Name: policyName, @@ -184,22 +180,6 @@ func TestClassifyBindings(t *testing.T) { }, } - markedForDeletionWithFinalizerBinding := fleetv1beta1.ClusterResourceBinding{ - ObjectMeta: metav1.ObjectMeta{ - Name: "binding-1", - Finalizers: []string{fleetv1beta1.SchedulerCleanupFinalizer}, - DeletionTimestamp: &now, - }, - } - markedForDeletionWithoutFinalizerBinding := fleetv1beta1.ClusterResourceBinding{ - ObjectMeta: metav1.ObjectMeta{ - Name: "binding-2", - DeletionTimestamp: &now, - }, - Spec: fleetv1beta1.ResourceBindingSpec{ - State: fleetv1beta1.BindingStateDeleting, - }, - } deletingBinding := fleetv1beta1.ClusterResourceBinding{ ObjectMeta: metav1.ObjectMeta{ Name: "binding-3", @@ -260,8 +240,6 @@ func TestClassifyBindings(t *testing.T) { } bindings := []fleetv1beta1.ClusterResourceBinding{ - markedForDeletionWithFinalizerBinding, - markedForDeletionWithoutFinalizerBinding, deletingBinding, associatedWithLeavingClusterBinding, assocaitedWithDisappearedClusterBinding, @@ -272,10 +250,9 @@ func TestClassifyBindings(t *testing.T) { wantActive := []*fleetv1beta1.ClusterResourceBinding{&activeBinding} wantCreating := []*fleetv1beta1.ClusterResourceBinding{&creatingBinding} wantObsolete := []*fleetv1beta1.ClusterResourceBinding{&obsoleteBinding} - wantDeleted := []*fleetv1beta1.ClusterResourceBinding{&markedForDeletionWithFinalizerBinding} wantDangling := []*fleetv1beta1.ClusterResourceBinding{&associatedWithLeavingClusterBinding, &assocaitedWithDisappearedClusterBinding} - active, creating, obsolete, dangling, deleted := classifyBindings(policy, bindings, clusters) + active, creating, obsolete, dangling := classifyBindings(policy, bindings, clusters) if !cmp.Equal(active, wantActive) { t.Errorf("classifyBindings() active = %v, want %v", active, wantActive) } @@ -291,44 +268,6 @@ func TestClassifyBindings(t *testing.T) { if !cmp.Equal(dangling, wantDangling) { t.Errorf("classifyBindings() dangling = %v, want %v", dangling, wantDangling) } - - if !cmp.Equal(deleted, wantDeleted) { - t.Errorf("classifyBindings() deleted = %v, want %v", deleted, wantDeleted) - } -} - -// TestRemoveSchedulerCleanupFinalizerFromBindings tests the removeSchedulerFinalizerFromBindings method. -func TestRemoveSchedulerCleanupFinalizerFromBindings(t *testing.T) { - binding := &fleetv1beta1.ClusterResourceBinding{ - ObjectMeta: metav1.ObjectMeta{ - Name: bindingName, - Finalizers: []string{fleetv1beta1.SchedulerCleanupFinalizer}, - }, - } - - fakeClient := fake.NewClientBuilder(). - WithScheme(scheme.Scheme). - WithObjects(binding). - Build() - // Construct framework manually instead of using NewFramework() to avoid mocking the controller manager. - f := &framework{ - client: fakeClient, - } - - ctx := context.Background() - if err := f.removeSchedulerCleanupFinalizerFrom(ctx, []*fleetv1beta1.ClusterResourceBinding{binding}); err != nil { - t.Fatalf("removeSchedulerFinalizerFromBindings() = %v, want no error", err) - } - - // Verify that the finalizer has been removed. - updatedBinding := &fleetv1beta1.ClusterResourceBinding{} - if err := fakeClient.Get(ctx, types.NamespacedName{Name: bindingName}, updatedBinding); err != nil { - t.Fatalf("Binding Get(%v) = %v, want no error", bindingName, err) - } - - if controllerutil.ContainsFinalizer(updatedBinding, fleetv1beta1.SchedulerCleanupFinalizer) { - t.Fatalf("Binding %s finalizers = %v, want no scheduler finalizer", bindingName, updatedBinding.Finalizers) - } } // TestShouldDownscale tests the shouldDownscale function. @@ -711,7 +650,7 @@ func TestUpdatePolicySnapshotStatusFrom(t *testing.T) { } ctx := context.Background() - if err := f.updatePolicySnapshotStatusFrom(ctx, policy, klog.KObj(policy), tc.filtered, tc.existing...); err != nil { + if err := f.updatePolicySnapshotStatusFrom(ctx, policy, tc.filtered, tc.existing...); err != nil { t.Fatalf("updatePolicySnapshotStatusFrom() = %v, want no error", err) } @@ -1923,9 +1862,6 @@ func TestCrossReferencePickedCustersAndObsoleteBindings(t *testing.T) { { ObjectMeta: metav1.ObjectMeta{ Name: bindingName1, - Finalizers: []string{ - fleetv1beta1.SchedulerCleanupFinalizer, - }, Labels: map[string]string{ fleetv1beta1.CRPTrackingLabel: crpName, }, @@ -1948,9 +1884,6 @@ func TestCrossReferencePickedCustersAndObsoleteBindings(t *testing.T) { { ObjectMeta: metav1.ObjectMeta{ Name: bindingName2, - Finalizers: []string{ - fleetv1beta1.SchedulerCleanupFinalizer, - }, Labels: map[string]string{ fleetv1beta1.CRPTrackingLabel: crpName, }, @@ -1973,9 +1906,6 @@ func TestCrossReferencePickedCustersAndObsoleteBindings(t *testing.T) { { ObjectMeta: metav1.ObjectMeta{ Name: bindingName3, - Finalizers: []string{ - fleetv1beta1.SchedulerCleanupFinalizer, - }, Labels: map[string]string{ fleetv1beta1.CRPTrackingLabel: crpName, }, @@ -2129,9 +2059,6 @@ func TestCrossReferencePickedCustersAndObsoleteBindings(t *testing.T) { { ObjectMeta: metav1.ObjectMeta{ Name: bindingName3, - Finalizers: []string{ - fleetv1beta1.SchedulerCleanupFinalizer, - }, Labels: map[string]string{ fleetv1beta1.CRPTrackingLabel: crpName, }, diff --git a/pkg/scheduler/framework/frameworkutils.go b/pkg/scheduler/framework/frameworkutils.go index 48537c080..a0f9d5555 100644 --- a/pkg/scheduler/framework/frameworkutils.go +++ b/pkg/scheduler/framework/frameworkutils.go @@ -11,7 +11,6 @@ import ( "sort" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" "go.goms.io/fleet/pkg/scheduler/framework/uniquename" @@ -30,13 +29,12 @@ import ( // policy; and // - deleted bindings, i.e, bindings that are marked for deletion in the API server, but have not // yet been marked as deleting by the scheduler. -func classifyBindings(policy *fleetv1beta1.ClusterPolicySnapshot, bindings []fleetv1beta1.ClusterResourceBinding, clusters []fleetv1beta1.MemberCluster) (active, creating, obsolete, dangling, deleted []*fleetv1beta1.ClusterResourceBinding) { +func classifyBindings(policy *fleetv1beta1.ClusterPolicySnapshot, bindings []fleetv1beta1.ClusterResourceBinding, clusters []fleetv1beta1.MemberCluster) (active, creating, obsolete, dangling []*fleetv1beta1.ClusterResourceBinding) { // Pre-allocate arrays. active = make([]*fleetv1beta1.ClusterResourceBinding, 0, len(bindings)) creating = make([]*fleetv1beta1.ClusterResourceBinding, 0, len(bindings)) obsolete = make([]*fleetv1beta1.ClusterResourceBinding, 0, len(bindings)) dangling = make([]*fleetv1beta1.ClusterResourceBinding, 0, len(bindings)) - deleted = make([]*fleetv1beta1.ClusterResourceBinding, 0, len(bindings)) // Build a map for clusters for quick loopup. clusterMap := make(map[string]fleetv1beta1.MemberCluster) @@ -50,11 +48,7 @@ func classifyBindings(policy *fleetv1beta1.ClusterPolicySnapshot, bindings []fle switch { case binding.DeletionTimestamp != nil: - // Check if the binding has been deleted, but still has the scheduler cleanup finalizer in presence. - if controllerutil.ContainsFinalizer(&binding, fleetv1beta1.SchedulerCleanupFinalizer) { - deleted = append(deleted, &binding) - } - // Ignore any binding that has been deleted, and has no scheduler cleanup finalizer in presence. + // Ignore any binding that has been deleted. case binding.Spec.State == fleetv1beta1.BindingStateDeleting: // Ignore any binding that is of the deleting state. case !isTargetClusterPresent || targetCluster.Spec.State == fleetv1beta1.ClusterStateLeave: @@ -76,7 +70,7 @@ func classifyBindings(policy *fleetv1beta1.ClusterPolicySnapshot, bindings []fle } } - return active, creating, obsolete, dangling, deleted + return active, creating, obsolete, dangling } // shouldDownscale checks if the scheduler needs to perform some downscaling, and (if so) how many active or creating bindings @@ -250,9 +244,6 @@ func crossReferencePickedCustersAndObsoleteBindings(crpName string, policy *flee toCreate = append(toCreate, &fleetv1beta1.ClusterResourceBinding{ ObjectMeta: metav1.ObjectMeta{ Name: name, - Finalizers: []string{ - fleetv1beta1.SchedulerCleanupFinalizer, - }, Labels: map[string]string{ fleetv1beta1.CRPTrackingLabel: crpName, }, diff --git a/pkg/scheduler/queue/queue.go b/pkg/scheduler/queue/queue.go index 328877145..a804650bb 100644 --- a/pkg/scheduler/queue/queue.go +++ b/pkg/scheduler/queue/queue.go @@ -8,6 +8,8 @@ Licensed under the MIT license. package queue import ( + "time" + "k8s.io/client-go/util/workqueue" ) @@ -18,7 +20,15 @@ type ClusterResourcePlacementKey string // ClusterResourcePlacementSchedulingQueueWriter is an interface which allows sources, such as controllers, to add // ClusterResourcePlacementKeys to the scheduling queue. type ClusterResourcePlacementSchedulingQueueWriter interface { + // Add adds a ClusterResourcePlacementKey to the work queue. + // + // Note that this bypasses the rate limiter. Add(crpKey ClusterResourcePlacementKey) + // AddRateLimited adds a ClusterResourcePlacementKey to the work queue after the rate limiter (if any) + // says that it is OK. + AddRateLimited(crpKey ClusterResourcePlacementKey) + // AddAfter adds a ClusterResourcePlacementKey to the work queue after a set duration. + AddAfter(crpKey ClusterResourcePlacementKey, duration time.Duration) } // ClusterResourcePlacementSchedulingQueue is an interface which queues ClusterResourcePlacements for the scheduler @@ -37,6 +47,8 @@ type ClusterResourcePlacementSchedulingQueue interface { NextClusterResourcePlacementKey() (key ClusterResourcePlacementKey, closed bool) // Done marks a ClusterResourcePlacementKey as done. Done(crpKey ClusterResourcePlacementKey) + // Forget untracks a ClusterResourcePlacementKey from rate limiter(s) (if any) set up with the queue. + Forget(crpKey ClusterResourcePlacementKey) } // simpleClusterResourcePlacementSchedulingQueue is a simple implementation of @@ -121,10 +133,30 @@ func (sq *simpleClusterResourcePlacementSchedulingQueue) Done(crpKey ClusterReso } // Add adds a ClusterResourcePlacementKey to the work queue. +// +// Note that this bypasses the rate limiter (if any). func (sq *simpleClusterResourcePlacementSchedulingQueue) Add(crpKey ClusterResourcePlacementKey) { sq.active.Add(crpKey) } +// AddRateLimited adds a ClusterResourcePlacementKey to the work queue after the rate limiter (if any) +// says that it is OK. +func (sq *simpleClusterResourcePlacementSchedulingQueue) AddRateLimited(crpKey ClusterResourcePlacementKey) { + sq.active.AddRateLimited(crpKey) +} + +// AddAfter adds a ClusterResourcePlacementKey to the work queue after a set duration. +// +// Note that this bypasses the rate limiter (if any) +func (sq *simpleClusterResourcePlacementSchedulingQueue) AddAfter(crpKey ClusterResourcePlacementKey, duration time.Duration) { + sq.active.AddAfter(crpKey, duration) +} + +// Forget untracks a ClusterResourcePlacementKey from rate limiter(s) (if any) set up with the queue. +func (sq *simpleClusterResourcePlacementSchedulingQueue) Forget(crpKey ClusterResourcePlacementKey) { + sq.active.Forget(crpKey) +} + // NewSimpleClusterResourcePlacementSchedulingQueue returns a // simpleClusterResourcePlacementSchedulingQueue. func NewSimpleClusterResourcePlacementSchedulingQueue(opts ...Option) ClusterResourcePlacementSchedulingQueue { diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index f8aa48320..8346c7ae2 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -3,4 +3,342 @@ Copyright (c) Microsoft Corporation. Licensed under the MIT license. */ +// Package scheduler features the scheduler for Fleet workloads. package scheduler + +import ( + "context" + "fmt" + "strconv" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" + "go.goms.io/fleet/pkg/scheduler/framework" + "go.goms.io/fleet/pkg/scheduler/queue" + "go.goms.io/fleet/pkg/utils/controller" +) + +// Scheduler is the scheduler for Fleet workloads. +type Scheduler struct { + // name is the name of the scheduler. + name string + + // framework is the scheduling framework in use by the scheduler. + // + // At this stage, a scheduler is always associated with one scheduling framework; in the long + // run, multiple frameworks may be supported, to allow the usage of varying scheduling configurations + // for different types of workloads. + framework framework.Framework + + // queue is the work queue in use by the scheduler; the scheduler pulls items from the queue and + // performs scheduling in accordance with them. + queue queue.ClusterResourcePlacementSchedulingQueue + + // client is the (cached) client in use by the scheduler for accessing Kubernetes API server. + client client.Client + // uncachedReader is the uncached read-only client in use by the scheduler for accessing + // Kubernetes API server; in most cases client should be used instead, unless consistency becomes + // a serious concern. + // TO-DO (chenyu1): explore the possbilities of using a mutation cache for better performance. + uncachedReader client.Reader + // manager is the controller manager in use by the scheduler. + manager ctrl.Manager + // eventRecorder is the event recorder in use by the scheduler. + eventRecorder record.EventRecorder +} + +// NewScheduler creates a scheduler. +func NewScheduler( + name string, + framework framework.Framework, + queue queue.ClusterResourcePlacementSchedulingQueue, + manager ctrl.Manager, +) *Scheduler { + return &Scheduler{ + name: name, + framework: framework, + queue: queue, + client: manager.GetClient(), + uncachedReader: manager.GetAPIReader(), + manager: manager, + eventRecorder: manager.GetEventRecorderFor(name), + } +} + +// ScheduleOnce performs scheduling for one single item pulled from the work queue. +func (s *Scheduler) scheduleOnce(ctx context.Context) { + // Retrieve the next item (name of a CRP) from the work queue. + // + // Note that this will block if no item is available. + crpName, closed := s.queue.NextClusterResourcePlacementKey() + if closed { + // End the run if the work queue has been closed. + klog.InfoS("Work queue has been closed") + return + } + + defer func() { + // Mark the key as done. + // + // Note that this will happen even if an error occurs. Should the key get requeued by Add() + // during the call, it will be added to the queue after this call returns. + s.queue.Done(crpName) + }() + + startTime := time.Now() + crpRef := klog.KRef("", string(crpName)) + klog.V(2).InfoS("Schedule once", "ClusterResourcePlacement", crpRef) + defer func() { + // Note that the time spent on pulling keys from the work queue (and the time spent on waiting + // for a key to arrive) is not counted here, as we cannot reliably distinguish between + // system processing latencies and actual duration of cluster resource placement absence. + latency := time.Since(startTime).Milliseconds() + klog.V(2).InfoS("Schedule once completed", "ClusterResourcePlacement", crpRef, "latency", latency) + }() + + // Retrieve the CRP. + crp := &fleetv1beta1.ClusterResourcePlacement{} + crpKey := types.NamespacedName{Name: string(crpName)} + if err := s.client.Get(ctx, crpKey, crp); err != nil { + if errors.IsNotFound(err) { + // The CRP has been gone before the scheduler gets a chance to + // process it; normally this would not happen as sources would not enqueue any CRP that + // has been marked for deletion but does not have the scheduler cleanup finalizer to + // the work queue. Such CRPs needs no further processing any way though, as the absence + // of the cleanup finalizer implies that bindings derived from the CRP are no longer present. + return + } + klog.ErrorS(err, "failed to get cluster resource placement", "clusterResourcePlacement", crpRef) + // Requeue for later processing. + s.queue.AddRateLimited(crpName) + return + } + + // Check if the CRP has been marked for deletion, and if it has the scheduler cleanup finalizer. + if crp.DeletionTimestamp != nil { + if controllerutil.ContainsFinalizer(crp, fleetv1beta1.SchedulerCRPCleanupFinalizer) { + if err := s.CleanupAllBindingsFor(ctx, crp); err != nil { + klog.ErrorS(err, "failed to clean up all bindings for cluster resource placement", "clusterResourcePlacement", crpRef) + // Requeue for later processing. + s.queue.AddRateLimited(crpName) + return + } + } + // The CRP has been marked for deletion but no longer has the scheduler cleanup finalizer; no + // additional handling is needed. + + // Untrack the key from the rate limiter. + s.queue.Forget(crpName) + return + } + + // The CRP has not been marked for deletion; run the scheduling cycle for it. + + // Verify that it has an active policy snapshot. + latestPolicySnapshot, err := s.lookupLatestPolicySnapshot(ctx, crp) + if err != nil { + klog.ErrorS(err, "failed to lookup latest policy snapshot", "clusterResourcePlacement", crpRef) + // No requeue is needed; the scheduler will be triggered again when an active policy + // snapshot is created. + + // Untrack the key for quicker reprocessing. + s.queue.Forget(crpName) + return + } + + // Add the scheduler cleanup finalizer to the CRP (if it does not have one yet). + if err := s.addSchedulerCleanupFinalizer(ctx, crp); err != nil { + klog.ErrorS(err, "failed to add scheduler cleanup finalizer", "clusterResourcePlacement", crpRef) + // Requeue for later processing. + s.queue.AddRateLimited(crpName) + return + } + + // Run the scheduling cycle. + // + // Note that the scheduler will enter this cycle as long as the CRP is active and an active + // policy snapshot has been produced. + res, err := s.framework.RunSchedulingCycleFor(ctx, crp.Name, latestPolicySnapshot) + if err != nil { + klog.ErrorS(err, "failed to run scheduling cycle", "clusterResourcePlacement", crpRef) + // Requeue for later processing. + s.queue.AddRateLimited(crpName) + return + } + + // Requeue if the scheduling cycle suggests so. + if res.Requeue { + if res.RequeueAfter > 0 { + s.queue.AddAfter(crpName, res.RequeueAfter) + return + } + // Untrack the key from the rate limiter. + s.queue.Forget(crpName) + // Requeue for later processing. + // + // Note that the key is added directly to the queue without having to wait for any rate limiter's + // approval. This is necessary as requeues, requested by the scheduler, occur when the scheduler + // is certain that more scheduling work needs to be done but it cannot be completed in + // one cycle (e.g., a plugin sets up a per-cycle batch limit, and consequently the scheduler must + // finish the scheduling in multiple cycles); in such cases, rate limiter should not add + // any delay to the requeues. + s.queue.Add(crpName) + } +} + +// Run starts the scheduler. +func (s *Scheduler) Run(ctx context.Context) { + klog.V(2).InfoS("Starting the scheduler") + defer func() { + klog.V(2).InfoS("Stopping the scheduler") + }() + + // Starting the scheduling queue. + s.queue.Run() + + // Run scheduleOnce forever. + // + // The loop starts in a dedicated goroutine; it exits when the context is canceled. + go wait.UntilWithContext(ctx, s.scheduleOnce, 0) + + // Wait for the context to be canceled. + <-ctx.Done() + + // Stopping the scheduling queue; drain if necessary. + // + // Note that if a scheduling cycle is in progress; this will only return when the + // cycle completes. + s.queue.CloseWithDrain() +} + +// Stop stops the scheduler. +func (s *Scheduler) Stop(ctx context.Context) {} + +// CleanupAllBindingsFor cleans up all bindings derived from a CRP. +func (s *Scheduler) CleanupAllBindingsFor(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) error { + crpRef := klog.KObj(crp) + + // List all bindings derived from the CRP. + // + // Note that the listing is performed using the uncached client; this is to ensure that all related + // bindings can be found, even if they have not been synced to the cache yet. + bindingList := &fleetv1beta1.ClusterResourceBindingList{} + listOptions := &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(labels.Set{fleetv1beta1.CRPTrackingLabel: crp.Name}), + } + // TO-DO (chenyu1): this is a very expensive op; explore options for optimization. + if err := s.uncachedReader.List(ctx, bindingList, listOptions); err != nil { + klog.ErrorS(err, "failed to list all bindings", "ClusterResourcePlacement", crpRef) + return controller.NewUnexpectedBehaviorError(err) + } + + // Remove the scheduler cleanup finalizer from all the bindings, and delete them. + // + // Note that once a CRP has been marked for deletion, it will no longer enter the scheduling cycle, + // so any cleanup finalizer has to be removed here. + // + // Also note that for deleted CRPs, derived bindings are deleted right away by the scheduler; + // the scheduler no longer marks them as deleting and waits for another controller to actually + // run the deletion. + for _, binding := range bindingList.Items { + // Delete the binding if it has not been marked for deletion yet. + if binding.DeletionTimestamp == nil { + if err := s.client.Delete(ctx, &binding); err != nil { + klog.ErrorS(err, "failed to delete binding", "ClusterResourceBinding", binding) + return controller.NewAPIServerError(err) + } + } + + // Check if the binding has the scheduler cleanup finalizer. + if controllerutil.ContainsFinalizer(&binding, fleetv1beta1.SchedulerCRPCleanupFinalizer) { + // Remove the scheduler cleanup finalizer from the binding. + controllerutil.RemoveFinalizer(&binding, fleetv1beta1.SchedulerCRPCleanupFinalizer) + // Update the binding. + if err := s.client.Update(ctx, &binding); err != nil { + klog.ErrorS(err, "failed to remove scheduler cleanup finalizer from binding", "ClusterResourceBinding", crpRef) + return controller.NewAPIServerError(err) + } + } + } + + // All bindings have been deleted; remove the scheduler cleanup finalizer from the CRP. + controllerutil.RemoveFinalizer(crp, fleetv1beta1.SchedulerCRPCleanupFinalizer) + if err := s.client.Update(ctx, crp); err != nil { + klog.ErrorS(err, "failed to remove scheduler cleanup finalizer from cluster resource placement", "ClusterResourcePlacement", crpRef) + return controller.NewAPIServerError(err) + } + + return nil +} + +// lookupLatestPolicySnapshot returns the latest (i.e., active) policy snapshot associated with +// a CRP. +func (s *Scheduler) lookupLatestPolicySnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) (*fleetv1beta1.ClusterPolicySnapshot, error) { + crpRef := klog.KObj(crp) + + // Find out the latest policy snapshot associated with the CRP. + policySnapshotList := &fleetv1beta1.ClusterPolicySnapshotList{} + labelSelector := labels.SelectorFromSet(labels.Set{ + fleetv1beta1.CRPTrackingLabel: crp.Name, + fleetv1beta1.IsLatestSnapshotLabel: strconv.FormatBool(true), + }) + listOptions := &client.ListOptions{LabelSelector: labelSelector} + // The scheduler lists with a cached client; this does not have any consistency concern as sources + // will always trigger the scheduler when a new policy snapshot is created. + if err := s.client.List(ctx, policySnapshotList, listOptions); err != nil { + klog.ErrorS(err, "failed to list policy snapshots of a cluster resource placement", "ClusterResourcePlacement", crpRef) + return nil, controller.NewAPIServerError(err) + } + + switch { + case len(policySnapshotList.Items) == 0: + // There is no latest policy snapshot associated with the CRP; it could happen when + // * the CRP is newly created; or + // * due to an unexpected situation, the sequence of policy snapshots is in an inconsistent state. + // + // Either way, it is out of the scheduler's scope to handle such a case; the scheduler will + // be triggered again if the situation is corrected. + err := fmt.Errorf("no latest policy snapshot associated with cluster resource placement") + klog.ErrorS(err, "ClusterResourcePlacement", crpRef) + return nil, err + case len(policySnapshotList.Items) > 1: + // There are multiple active policy snapshots associated with the CRP; normally this + // will never happen, as only one policy snapshot can be active in the sequence. + // + // Similarly, it is out of the scheduler's scope to handle such a case; the scheduler will + // report this unexpected occurrence but does not register it as a scheduler-side error. + // If (and when) the situation is corrected, the scheduler will be triggered again. + err := fmt.Errorf("too many active policy snapshots: %d, want 1", len(policySnapshotList.Items)) + klog.ErrorS(err, "multiple latest policy snapshots associated with cluster resource placement", "ClusterResourcePlacement", crpRef) + return nil, err + default: + // Found the one and only active policy snapshot. + return &policySnapshotList.Items[0], nil + } +} + +// addSchedulerCleanupFinalizer adds the scheduler cleanup finalizer to a CRP (if it does not +// have it yet). +func (s *Scheduler) addSchedulerCleanupFinalizer(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) error { + // Add the finalizer only if the CRP does not have one yet. + if !controllerutil.ContainsFinalizer(crp, fleetv1beta1.SchedulerCRPCleanupFinalizer) { + controllerutil.AddFinalizer(crp, fleetv1beta1.SchedulerCRPCleanupFinalizer) + + if err := s.client.Update(ctx, crp); err != nil { + klog.ErrorS(err, "failed to update cluster resource placement", "ClusterResourcePlacement", klog.KObj(crp)) + return controller.NewAPIServerError(err) + } + } + + return nil +}