From 747406eee9fff6710359e8c8fb33ca830d5468ef Mon Sep 17 00:00:00 2001 From: michaelawyu Date: Thu, 21 Mar 2024 17:40:01 +0800 Subject: [PATCH] feat: property-based scheduling: add support for property-based scheduling in the scheduler member cluster watcher (#714) --- .../watchers/membercluster/controller.go | 46 +++- .../controller_integration_test.go | 210 ++++++++++++++++++ 2 files changed, 252 insertions(+), 4 deletions(-) diff --git a/pkg/scheduler/watchers/membercluster/controller.go b/pkg/scheduler/watchers/membercluster/controller.go index cf07bed6a..496d0f618 100644 --- a/pkg/scheduler/watchers/membercluster/controller.go +++ b/pkg/scheduler/watchers/membercluster/controller.go @@ -12,6 +12,7 @@ import ( "reflect" "time" + "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" @@ -55,7 +56,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu // // It may happen for 2 reasons: // - // a) the cluster setting, specifically its labels, has changed; and/or + // a) the cluster's setup (e.g., its labels) or status (e.g., resource/non-resource properties), + // has changed; and/or // b) an unexpected development which originally leads the scheduler to disregard the cluster // (e.g., agents not joining, network partition, etc.) has been resolved. // @@ -63,7 +65,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu // // Similarly, it may happen for 2 reasons: // - // a) the cluster setting, specifically its labels, has changed; and/or + // a) the cluster's setup (e.g., its labels) or status (e.g., resource/non-resource properties), + // has changed; and/or // b) an unexpected development (e.g., agents failing, network partition, etc.) has occurred. // c) the cluster, which may or may not have resources placed on it, has left the fleet (deleting). // @@ -190,20 +193,55 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { return false } - // Capture label changes. - // clusterKObj := klog.KObj(newCluster) // The cluster is being deleted. if oldCluster.GetDeletionTimestamp().IsZero() && !newCluster.GetDeletionTimestamp().IsZero() { klog.V(2).InfoS("A member cluster is leaving the fleet", "memberCluster", clusterKObj) return true } + + // Capture label changes. + // // Note that the controller runs only when label changes happen on joined clusters. if !reflect.DeepEqual(oldCluster.Labels, newCluster.Labels) { klog.V(2).InfoS("A member cluster label change has been detected", "memberCluster", clusterKObj) return true } + // Capture non-resource property changes. + // + // Observation time refreshes is not considered as a change. + oldProperties := oldCluster.Status.Properties + newProperties := newCluster.Status.Properties + if len(oldProperties) != len(newProperties) { + return true + } + for oldK, oldV := range oldProperties { + newV, ok := newProperties[oldK] + if !ok || oldV.Value != newV.Value { + return true + } + } + + // Capture resource usage changes. + oldCapacity := oldCluster.Status.ResourceUsage.Capacity + newCapacity := newCluster.Status.ResourceUsage.Capacity + if !equality.Semantic.DeepEqual(oldCapacity, newCapacity) { + return true + } + + oldAllocatable := oldCluster.Status.ResourceUsage.Allocatable + newAllocatable := newCluster.Status.ResourceUsage.Allocatable + if !equality.Semantic.DeepEqual(oldAllocatable, newAllocatable) { + return true + } + + oldAvailable := oldCluster.Status.ResourceUsage.Available + newAvailable := newCluster.Status.ResourceUsage.Available + if !equality.Semantic.DeepEqual(oldAvailable, newAvailable) { + return true + } + // Check the resource placement eligibility for the old and new cluster object. oldEligible, _ := r.ClusterEligibilityChecker.IsEligible(oldCluster) newEligible, _ := r.ClusterEligibilityChecker.IsEligible(newCluster) diff --git a/pkg/scheduler/watchers/membercluster/controller_integration_test.go b/pkg/scheduler/watchers/membercluster/controller_integration_test.go index 22ab2eda4..f6e1376a9 100644 --- a/pkg/scheduler/watchers/membercluster/controller_integration_test.go +++ b/pkg/scheduler/watchers/membercluster/controller_integration_test.go @@ -14,6 +14,8 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -32,6 +34,10 @@ const ( dummyReason = "dummyReason" dummyLabel = "dummy-label" dummyLabelValue = "dummy-label-value" + + dummyNonResourcePropertyName = "dummy-non-resource-property" + dummyNonResourcePropertyValue1 = "0" + dummyNonResourcePropertyValue2 = "1" ) var ( @@ -150,6 +156,210 @@ var _ = Describe("scheduler member cluster source controller", Serial, Ordered, }) }) + Context("ready cluster has a non-resource property change", func() { + BeforeAll(func() { + Consistently(noKeyEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Workqueue is not empty") + + // Retrieve the cluster. + memberCluster := &clusterv1beta1.MemberCluster{} + Expect(hubClient.Get(ctx, types.NamespacedName{Name: clusterName1}, memberCluster)).To(Succeed(), "Failed to get member cluster") + + // Update the list of non-resource properties. + memberCluster.Status.Properties = map[clusterv1beta1.PropertyName]clusterv1beta1.PropertyValue{ + dummyNonResourcePropertyName: { + Value: dummyNonResourcePropertyValue1, + ObservationTime: metav1.NewTime(time.Now()), + }, + } + Expect(hubClient.Status().Update(ctx, memberCluster)).Should(Succeed(), "Failed to update member cluster non-resource properties") + }) + + It("should enqueue CRPs (case 1a)", func() { + Eventually(qualifiedKeysEnqueuedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Keys are not enqueued as expected") + Consistently(qualifiedKeysEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Keys are not enqueued as expected") + }) + + It("can empty the key collector", func() { + keyCollector.Reset() + Eventually(noKeyEnqueuedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Workqueue is not empty") + Consistently(noKeyEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Workqueue is not empty") + }) + + It("can update the property", func() { + // Retrieve the cluster. + memberCluster := &clusterv1beta1.MemberCluster{} + Expect(hubClient.Get(ctx, types.NamespacedName{Name: clusterName1}, memberCluster)).To(Succeed(), "Failed to get member cluster") + + // Update the list of non-resource properties. + memberCluster.Status.Properties = map[clusterv1beta1.PropertyName]clusterv1beta1.PropertyValue{ + dummyNonResourcePropertyName: { + Value: dummyNonResourcePropertyValue2, + ObservationTime: metav1.NewTime(time.Now()), + }, + } + Expect(hubClient.Status().Update(ctx, memberCluster)).Should(Succeed(), "Failed to update member cluster non-resource properties") + }) + + It("should enqueue CRPs (case 1a)", func() { + Eventually(qualifiedKeysEnqueuedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Keys are not enqueued as expected") + Consistently(qualifiedKeysEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Keys are not enqueued as expected") + }) + + AfterAll(func() { + keyCollector.Reset() + }) + }) + + Context("ready cluster has a total capacity change", func() { + BeforeAll(func() { + Consistently(noKeyEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Workqueue is not empty") + + // Retrieve the cluster. + memberCluster := &clusterv1beta1.MemberCluster{} + Expect(hubClient.Get(ctx, types.NamespacedName{Name: clusterName1}, memberCluster)).To(Succeed(), "Failed to get member cluster") + + // Update the list of non-resource properties. + memberCluster.Status.ResourceUsage.Capacity = corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1000m"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + } + Expect(hubClient.Status().Update(ctx, memberCluster)).Should(Succeed(), "Failed to update member cluster non-resource properties") + }) + + It("should enqueue CRPs (case 1a)", func() { + Eventually(qualifiedKeysEnqueuedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Keys are not enqueued as expected") + Consistently(qualifiedKeysEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Keys are not enqueued as expected") + }) + + It("can empty the key collector", func() { + keyCollector.Reset() + Eventually(noKeyEnqueuedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Workqueue is not empty") + Consistently(noKeyEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Workqueue is not empty") + }) + + It("can update the total capacity", func() { + // Retrieve the cluster. + memberCluster := &clusterv1beta1.MemberCluster{} + Expect(hubClient.Get(ctx, types.NamespacedName{Name: clusterName1}, memberCluster)).To(Succeed(), "Failed to get member cluster") + + // Update the list of non-resource properties. + memberCluster.Status.ResourceUsage.Capacity = corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2000m"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + } + Expect(hubClient.Status().Update(ctx, memberCluster)).Should(Succeed(), "Failed to update member cluster non-resource properties") + }) + + It("should enqueue CRPs (case 1a)", func() { + Eventually(qualifiedKeysEnqueuedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Keys are not enqueued as expected") + Consistently(qualifiedKeysEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Keys are not enqueued as expected") + }) + + AfterAll(func() { + keyCollector.Reset() + }) + }) + + Context("ready cluster has an allocatable capacity change", func() { + BeforeAll(func() { + Consistently(noKeyEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Workqueue is not empty") + + // Retrieve the cluster. + memberCluster := &clusterv1beta1.MemberCluster{} + Expect(hubClient.Get(ctx, types.NamespacedName{Name: clusterName1}, memberCluster)).To(Succeed(), "Failed to get member cluster") + + // Update the list of non-resource properties. + memberCluster.Status.ResourceUsage.Allocatable = corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1000m"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + } + Expect(hubClient.Status().Update(ctx, memberCluster)).Should(Succeed(), "Failed to update member cluster non-resource properties") + }) + + It("should enqueue CRPs (case 1a)", func() { + Eventually(qualifiedKeysEnqueuedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Keys are not enqueued as expected") + Consistently(qualifiedKeysEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Keys are not enqueued as expected") + }) + + It("can empty the key collector", func() { + keyCollector.Reset() + Eventually(noKeyEnqueuedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Workqueue is not empty") + Consistently(noKeyEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Workqueue is not empty") + }) + + It("can update the allocatable capacity", func() { + // Retrieve the cluster. + memberCluster := &clusterv1beta1.MemberCluster{} + Expect(hubClient.Get(ctx, types.NamespacedName{Name: clusterName1}, memberCluster)).To(Succeed(), "Failed to get member cluster") + + // Update the list of non-resource properties. + memberCluster.Status.ResourceUsage.Allocatable = corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2000m"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + } + Expect(hubClient.Status().Update(ctx, memberCluster)).Should(Succeed(), "Failed to update member cluster non-resource properties") + }) + + It("should enqueue CRPs (case 1a)", func() { + Eventually(qualifiedKeysEnqueuedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Keys are not enqueued as expected") + Consistently(qualifiedKeysEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Keys are not enqueued as expected") + }) + + AfterAll(func() { + keyCollector.Reset() + }) + }) + + Context("ready cluster has an available capacity change", func() { + BeforeAll(func() { + Consistently(noKeyEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Workqueue is not empty") + + // Retrieve the cluster. + memberCluster := &clusterv1beta1.MemberCluster{} + Expect(hubClient.Get(ctx, types.NamespacedName{Name: clusterName1}, memberCluster)).To(Succeed(), "Failed to get member cluster") + + // Update the list of non-resource properties. + memberCluster.Status.ResourceUsage.Available = corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1000m"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + } + Expect(hubClient.Status().Update(ctx, memberCluster)).Should(Succeed(), "Failed to update member cluster non-resource properties") + }) + + It("should enqueue CRPs (case 1a)", func() { + Eventually(qualifiedKeysEnqueuedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Keys are not enqueued as expected") + Consistently(qualifiedKeysEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Keys are not enqueued as expected") + }) + + It("can empty the key collector", func() { + keyCollector.Reset() + Eventually(noKeyEnqueuedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Workqueue is not empty") + Consistently(noKeyEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Workqueue is not empty") + }) + + It("can update the available capacity", func() { + // Retrieve the cluster. + memberCluster := &clusterv1beta1.MemberCluster{} + Expect(hubClient.Get(ctx, types.NamespacedName{Name: clusterName1}, memberCluster)).To(Succeed(), "Failed to get member cluster") + + // Update the list of non-resource properties. + memberCluster.Status.ResourceUsage.Available = corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2000m"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + } + Expect(hubClient.Status().Update(ctx, memberCluster)).Should(Succeed(), "Failed to update member cluster non-resource properties") + }) + + It("should enqueue CRPs (case 1a)", func() { + Eventually(qualifiedKeysEnqueuedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Keys are not enqueued as expected") + Consistently(qualifiedKeysEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Keys are not enqueued as expected") + }) + + AfterAll(func() { + keyCollector.Reset() + }) + }) + Context("ready cluster is out of sync", func() { BeforeAll(func() { Consistently(noKeyEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Workqueue is not empty")