diff --git a/pkg/propertyprovider/aks/controllers/node.go b/pkg/propertyprovider/aks/controllers/node.go new file mode 100644 index 000000000..e7891a128 --- /dev/null +++ b/pkg/propertyprovider/aks/controllers/node.go @@ -0,0 +1,86 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +// Package controllers feature a number of controllers that are in use +// by the AKS property provider. +package controllers + +import ( + "context" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + "go.goms.io/fleet/pkg/propertyprovider/aks/trackers" +) + +// NodeReconciler reconciles Node objects. +type NodeReconciler struct { + NT *trackers.NodeTracker + Client client.Client +} + +// Reconcile reconciles a node object. +func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + nodeRef := klog.KRef(req.Namespace, req.Name) + startTime := time.Now() + klog.V(2).InfoS("Reconciliation starts for node objects in the AKS property provider", "node", nodeRef) + defer func() { + latency := time.Since(startTime).Milliseconds() + klog.V(2).InfoS("Reconciliation ends for node objects in the AKS property provider", "node", nodeRef, "latency", latency) + }() + + // Retrieve the node object. + node := &corev1.Node{} + if err := r.Client.Get(ctx, req.NamespacedName, node); err != nil { + // Failed to get the node object; this signals that the node should untracked. + if errors.IsNotFound(err) { + // This branch essentially processes the node deletion event (the actual deletion). + // At this point the node may have not been tracked by the tracker at all; if that's + // the case, the removal (untracking) operation is a no-op. + // + // Note that this controller will not add any finalizer to node objects, so as to + // avoid blocking normal Kuberneters operations under unexpected circumstances. + klog.V(2).InfoS("Node is not found; untrack it from the property provider", "node", nodeRef) + r.NT.Remove(req.Name) + return ctrl.Result{}, nil + } + // For other errors, retry the reconciliation. + klog.ErrorS(err, "Failed to get the node object", "node", nodeRef) + return ctrl.Result{}, err + } + + // Note that this controller will not untrack a node when it is first marked for deletion; + // instead, it performs the untracking when the node object is actually gone from the + // etcd store. This is intentional, as when a node is marked for deletion, workloads might + // not have been drained from it, and untracking the node too early might lead to a + // case of temporary inconsistency where the amount of requested resource exceed the + // allocatable capacity. + + // Track the node. If it has been tracked, update its total and allocatable capacity + // information with the tracker. + // + // Note that normally the capacity information remains immutable before object + // creation; the tracker update only serves as a sanity check. + // + // Also note that the tracker will attempt to track the node even if it has been + // marked for deletion, as cordoned, or as unschedulable. This behavior is consistent with + // the original Fleet setup. + klog.V(2).InfoS("Attempt to track the node", "node", nodeRef) + r.NT.AddOrUpdate(node) + + return ctrl.Result{}, nil +} + +func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error { + // Reconcile any node changes (create, update, delete). + return ctrl.NewControllerManagedBy(mgr). + For(&corev1.Node{}). + Complete(r) +} diff --git a/pkg/propertyprovider/aks/controllers/pod.go b/pkg/propertyprovider/aks/controllers/pod.go new file mode 100644 index 000000000..bbeba322c --- /dev/null +++ b/pkg/propertyprovider/aks/controllers/pod.go @@ -0,0 +1,98 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +// Package controllers feature a number of controllers that are in use +// by the AKS property provider. +package controllers + +import ( + "context" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + "go.goms.io/fleet/pkg/propertyprovider/aks/trackers" +) + +// TO-DO (chenyu1): this is a relatively expensive watcher, due to how frequent pods can change +// in a Kubernetes cluster; unfortunately at this moment there does not seem to be a better way +// to observe the changes of requested resources in a cluster. The alternative, which is to use +// Lists, adds too much overhead to the API server. + +// PodReconciler reconciles Pod objects. +type PodReconciler struct { + PT *trackers.PodTracker + Client client.Client +} + +// Reconcile reconciles a pod object. +func (p *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + podRef := klog.KRef(req.Namespace, req.Name) + startTime := time.Now() + klog.V(2).InfoS("Reconciliation starts for pod objects in the AKS property provider", "pod", podRef) + defer func() { + latency := time.Since(startTime).Milliseconds() + klog.V(2).InfoS("Reconciliation ends for pod objects in the AKS property provider", "pod", podRef, "latency", latency) + }() + + // Retrieve the pod object. + pod := &corev1.Pod{} + if err := p.Client.Get(ctx, req.NamespacedName, pod); err != nil { + // Failed to get the pod object. + if errors.IsNotFound(err) { + // This branch essentially processes the pod deletion event (the actual deletion). + // At this point the pod may have not been tracked by the tracker at all; if that's + // the case, the removal (untracking) operation is a no-op. + // + // Note that this controller will not add any finalizer to pod objects, so as to + // avoid blocking normal Kuberneters operations under unexpected circumstances. + p.PT.Remove(req.NamespacedName.String()) + return ctrl.Result{}, nil + } + + // For other errors, retry the reconciliation. + klog.ErrorS(err, "Failed to get the pod object", "pod", podRef) + return ctrl.Result{}, err + } + + // Note that this controller will not untrack a pod when it is first marked for deletion; + // instead, it performs the untracking when the pod object is actually gone from the + // etcd store. This is intentional, as when a pod is marked for deletion, workloads might + // not have been successfully terminated yet, and untracking the pod too early might lead to a + // case of temporary inconsistency. + + // Track the pod if: + // + // * it is **NOT** of the Succeeded or Failed state; and + // * it has been assigned to a node. + // + // This behavior is consistent with how the Kubernetes CLI tool reports requested capacity + // on a specific node (`kubectl describe node` command). + // + // Note that the tracker will attempt to track the pod even if it has been marked for deletion. + if len(pod.Spec.NodeName) > 0 && pod.Status.Phase != corev1.PodSucceeded && pod.Status.Phase != corev1.PodFailed { + klog.V(2).InfoS("Attempt to track the pod", "pod", podRef) + p.PT.AddOrUpdate(pod) + } else { + // Untrack the pod. + // + // It may have been descheduled, or transited into a terminal state. + klog.V(2).InfoS("Untrack the pod", "pod", podRef) + p.PT.Remove(req.NamespacedName.String()) + } + + return ctrl.Result{}, nil +} + +func (p *PodReconciler) SetupWithManager(mgr ctrl.Manager) error { + // Reconcile any pod changes (create, update, delete). + return ctrl.NewControllerManagedBy(mgr). + For(&corev1.Pod{}). + Complete(p) +} diff --git a/pkg/propertyprovider/aks/provider.go b/pkg/propertyprovider/aks/provider.go new file mode 100644 index 000000000..e205eea29 --- /dev/null +++ b/pkg/propertyprovider/aks/provider.go @@ -0,0 +1,336 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +// Package aks features the AKS property provider for Fleet. +package aks + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + + clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1" + "go.goms.io/fleet/pkg/propertyprovider" + "go.goms.io/fleet/pkg/propertyprovider/aks/controllers" + "go.goms.io/fleet/pkg/propertyprovider/aks/trackers" +) + +const ( + // A list of properties that the AKS property provider collects. + + // NodeCountProperty is a property that describes the number of nodes in the cluster. + NodeCountProperty = "kubernetes.azure.com/node-count" + // PerCPUCoreCostProperty is a property that describes the average hourly cost of a CPU core in + // a Kubernetes cluster. + PerCPUCoreCostProperty = "kubernetes.azure.com/per-cpu-core-cost" + // PerGBMemoryCostProperty is a property that describes the average cost of one GB of memory in + // a Kubernetes cluster. + PerGBMemoryCostProperty = "kubernetes.azure.com/per-gb-memory-cost" + + // The resource properties. + TotalCPUCapacityProperty = "resources.kubernetes-fleet.io/total-cpu" + AllocatableCPUCapacityProperty = "resources.kubernetes-fleet.io/allocatable-cpu" + AvailableCPUCapacityProperty = "resources.kubernetes-fleet.io/available-cpu" + + TotalMemoryCapacityProperty = "resources.kubernetes-fleet.io/total-memory" + AllocatableMemoryCapacityProperty = "resources.kubernetes-fleet.io/allocatable-memory" + AvailableMemoryCapacityProperty = "resources.kubernetes-fleet.io/available-memory" + + CostPrecisionTemplate = "%.3f" +) + +const ( + // The condition related values in use by the AKS property provider. + + // PropertyCollectionSucceededConditionType is a condition type that indicates whether a + // property collection attempt has succeeded. + PropertyCollectionSucceededConditionType = "AKSClusterPropertyCollectionSucceeded" + PropertyCollectionSucceededReason = "AllPropertiesCollectedSuccessfully" + PropertyCollectionFailedCostErrorReason = "FailedToCollectCosts" + PropertyCollectionSucceededMessage = "All properties have been collected successfully" + PropertyCollectionFailedCostErrorMessageTemplate = "An error has occurred when collecting cost properties: %v" +) + +// PropertyProvider is the AKS property provider for Fleet. +type PropertyProvider struct { + // The trackers. + podTracker *trackers.PodTracker + nodeTracker *trackers.NodeTracker + + // The region where the AKS property provider resides. + // + // This is necessary as the pricing client requires that a region to be specified; it can + // be either specified by the user or auto-discovered from the AKS cluster. + region *string + + // The controller manager in use by the AKS property provider; this field is mostly reserved for + // testing purposes. + mgr ctrl.Manager +} + +// Verify that the AKS property provider implements the MetricProvider interface at compile time. +var _ propertyprovider.PropertyProvider = &PropertyProvider{} + +// Start starts the AKS property provider. +func (p *PropertyProvider) Start(ctx context.Context, config *rest.Config) error { + klog.V(2).Info("Starting AKS property provider") + + mgr, err := ctrl.NewManager(config, ctrl.Options{ + Scheme: scheme.Scheme, + // Disable metric serving for the AKS property provider controller manager. + // + // Note that this will not stop the metrics from being collected and exported; as they + // are registered via a top-level variable as a part of the controller runtime package, + // which is also used by the Fleet member agent. + Metrics: metricsserver.Options{ + BindAddress: "0", + }, + // Disable health probe serving for the AKS property provider controller manager. + HealthProbeBindAddress: "0", + // Disable leader election for the AKS property provider. + // + // Note that for optimal performance, only the running instance of the Fleet member agent + // (if there are multiple ones) should have the AKS property provider enabled; this can + // be achieved by starting the AKS property provider only when an instance of the Fleet + // member agent wins the leader election. It should be noted that running the AKS property + // provider for multiple times will not incur any side effect other than some minor + // performance costs, as at this moment the AKS property provider observes data individually + // in a passive manner with no need for any centralized state. + LeaderElection: false, + }) + p.mgr = mgr + + if err != nil { + klog.ErrorS(err, "Failed to start AKS property provider") + return err + } + + if p.region == nil { + klog.V(2).Info("Auto-discover region as none has been specified") + // Note that an API reader is passed here for the purpose of auto-discovering region + // information from AKS nodes; at this time the cache from the controller manager + // has not been initialized yet and as a result cached client is not yet available. + // + // This incurs the slightly higher overhead, however, as auto-discovery runs only + // once, the performance impact is negligible. + discoveredRegion, err := p.autoDiscoverRegionAndSetupTrackers(ctx, mgr.GetAPIReader()) + if err != nil { + klog.ErrorS(err, "Failed to auto-discover region for the AKS property provider") + return err + } + p.region = discoveredRegion + } + klog.V(2).Infof("Starting with the specified region %s", *p.region) + pp := trackers.NewAKSKarpenterPricingClient(ctx, *p.region) + p.podTracker = trackers.NewPodTracker() + p.nodeTracker = trackers.NewNodeTracker(pp) + + // Set up the node and pod reconcilers. + klog.V(2).Info("Starting the node reconciler") + nodeReconciler := &controllers.NodeReconciler{ + NT: p.nodeTracker, + Client: mgr.GetClient(), + } + if err := nodeReconciler.SetupWithManager(mgr); err != nil { + klog.ErrorS(err, "Failed to start the node reconciler in the AKS property provider") + return err + } + + klog.V(2).Info("Starting the pod reconciler") + podReconciler := &controllers.PodReconciler{ + PT: p.podTracker, + Client: mgr.GetClient(), + } + if err := podReconciler.SetupWithManager(mgr); err != nil { + klog.ErrorS(err, "Failed to start the pod reconciler in the AKS property provider") + return err + } + + // Start the controller manager. + // + // Note that the controller manager will run in a separate goroutine to avoid blocking + // the member agent. + go func() { + // This call will block until the context exits. + if err := mgr.Start(ctx); err != nil { + klog.ErrorS(err, "Failed to start the AKS property provider controller manager") + } + }() + + // Wait for the cache to sync. + // + // Note that this does not guarantee that any of the object changes has actually been + // processed; it only implies that an initial state has been populated. Though for our + // use case it might be good enough, considering that the only side effect is that + // some exported properties might be skewed initially (e.g., nodes/pods not being tracked). + // + // An alternative is to perform a list for once during the startup, which might be + // too expensive for a large cluster. + mgr.GetCache().WaitForCacheSync(ctx) + + return nil +} + +// Collect collects the properties of an AKS cluster. +func (p *PropertyProvider) Collect(_ context.Context) propertyprovider.PropertyCollectionResponse { + conds := make([]metav1.Condition, 0, 1) + + // Collect the non-resource properties. + properties := make(map[clusterv1beta1.PropertyName]clusterv1beta1.PropertyValue) + properties[NodeCountProperty] = clusterv1beta1.PropertyValue{ + Value: fmt.Sprintf("%d", p.nodeTracker.NodeCount()), + ObservationTime: metav1.Now(), + } + + perCPUCost, perGBMemoryCost, err := p.nodeTracker.Costs() + if err != nil { + // Note that the last transition time is not tracked here, as the provider does not + // track the previously returned condition. A timestamp will be added in the upper layer. + conds = append(conds, metav1.Condition{ + Type: PropertyCollectionSucceededConditionType, + Status: metav1.ConditionFalse, + Reason: "FailedToCollectCosts", + Message: fmt.Sprintf(PropertyCollectionFailedCostErrorMessageTemplate, err), + }) + } else { + properties[PerCPUCoreCostProperty] = clusterv1beta1.PropertyValue{ + Value: fmt.Sprintf(CostPrecisionTemplate, perCPUCost), + ObservationTime: metav1.Now(), + } + properties[PerGBMemoryCostProperty] = clusterv1beta1.PropertyValue{ + Value: fmt.Sprintf(CostPrecisionTemplate, perGBMemoryCost), + ObservationTime: metav1.Now(), + } + } + + // Collect the resource properties. + resources := clusterv1beta1.ResourceUsage{} + resources.Capacity = p.nodeTracker.TotalCapacity() + resources.Allocatable = p.nodeTracker.TotalAllocatable() + + requested := p.podTracker.TotalRequested() + available := make(corev1.ResourceList) + for rn := range resources.Allocatable { + left := resources.Allocatable[rn].DeepCopy() + // In some unlikely scenarios, it could happen that, due to unavoidable + // inconsistencies in the data collection process, the total value of a specific + // requested resource exceeds that of the allocatable resource, as observed by + // the property provider; for example, the node tracker might fail to track a node + // in time yet the some pods have been assigned to the pod and gets tracked by + // the pod tracker. In such cases, the property provider will report a zero + // value for the resource; and this occurrence should get fixed in the next (few) + // property collection iterations. + if left.Cmp(requested[rn]) > 0 { + left.Sub(requested[rn]) + } else { + left = resource.Quantity{} + } + available[rn] = left + } + resources.Available = available + + // If no errors are found, report a success as a condition. + if len(conds) == 0 { + // Note that the last transition time is not tracked here, as the provider does not + // track the previously returned condition. A timestamp will be added in the upper layer. + conds = append(conds, metav1.Condition{ + Type: PropertyCollectionSucceededConditionType, + Status: metav1.ConditionTrue, + Reason: PropertyCollectionSucceededReason, + Message: PropertyCollectionSucceededMessage, + }) + } + + // Return the collection response. + return propertyprovider.PropertyCollectionResponse{ + Properties: properties, + Resources: resources, + Conditions: conds, + } +} + +// autoDiscoverRegionAndSetupTrackers auto-discovers the region of the AKS cluster. +func (p *PropertyProvider) autoDiscoverRegionAndSetupTrackers(ctx context.Context, c client.Reader) (*string, error) { + klog.V(2).Info("Auto-discover region for the AKS property provider") + // Auto-discover the region by listing the nodes. + nodeList := &corev1.NodeList{} + // List only one node to reduce performance impact (if supported). + // + // By default an AKS cluster always has at least one node; all nodes should be in the same + // region and has the topology label set. + req, err := labels.NewRequirement(corev1.LabelTopologyRegion, selection.Exists, []string{}) + if err != nil { + // This should never happen. + err := fmt.Errorf("failed to create a label requirement: %w", err) + klog.Error(err) + return nil, err + } + listOptions := client.ListOptions{ + LabelSelector: labels.NewSelector().Add(*req), + Limit: 1, + } + if err := c.List(ctx, nodeList, &listOptions); err != nil { + err := fmt.Errorf("failed to list nodes with the region label: %w", err) + klog.Error(err) + return nil, err + } + + // If no nodes are found, return an error. + if len(nodeList.Items) == 0 { + err := fmt.Errorf("no nodes found with the region label") + klog.Error(err) + return nil, err + } + + // Extract the region from the first node via the region label. + node := nodeList.Items[0] + nodeRegion, found := node.Labels[corev1.LabelTopologyRegion] + if !found { + // The region label is absent; normally this should never occur. + err := fmt.Errorf("region label is absent on node %s", node.Name) + klog.Error(err) + return nil, err + } + klog.V(2).InfoS("Auto-discovered region for the AKS property provider", "region", nodeRegion) + + return &nodeRegion, nil +} + +// New returns a new AKS property provider using the default pricing provider, which is, +// at this moment, an AKS Karpenter pricing client. +// +// If the region is unspecified at the time when this function is called, the provider +// will attempt to auto-discover the region of its host cluster when the Start method is +// called. +func New(region *string) propertyprovider.PropertyProvider { + return &PropertyProvider{ + region: region, + } +} + +// NewWithPricingProvider returns a new AKS property provider with the given +// pricing provider. +// +// This is mostly used for allow plugging in of alternate pricing providers (one that +// does not use the Karpenter client), and for testing purposes. +func NewWithPricingProvider(pp trackers.PricingProvider) propertyprovider.PropertyProvider { + return &PropertyProvider{ + podTracker: trackers.NewPodTracker(), + nodeTracker: trackers.NewNodeTracker(pp), + region: ptr.To("preset"), + } +} diff --git a/pkg/propertyprovider/aks/provider_integration_test.go b/pkg/propertyprovider/aks/provider_integration_test.go new file mode 100644 index 000000000..7e8c7312d --- /dev/null +++ b/pkg/propertyprovider/aks/provider_integration_test.go @@ -0,0 +1,570 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package aks + +import ( + "fmt" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1" + "go.goms.io/fleet/pkg/propertyprovider" + "go.goms.io/fleet/pkg/propertyprovider/aks/trackers" +) + +var ( + ignoreObservationTimeFieldInPropertyValue = cmpopts.IgnoreFields(clusterv1beta1.PropertyValue{}, "ObservationTime") +) + +var ( + nodeDeletedActual = func(name string) func() error { + return func() error { + node := &corev1.Node{} + if err := memberClient.Get(ctx, types.NamespacedName{Name: name}, node); !errors.IsNotFound(err) { + return fmt.Errorf("node has not been deleted yet") + } + return nil + } + } + podDeletedActual = func(namespace, name string) func() error { + return func() error { + pod := &corev1.Pod{} + if err := memberClient.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, pod); !errors.IsNotFound(err) { + return fmt.Errorf("pod has not been deleted yet") + } + return nil + } + } + + shouldCreateNodes = func(nodes ...corev1.Node) func() { + return func() { + for idx := range nodes { + node := nodes[idx].DeepCopy() + Expect(memberClient.Create(ctx, node)).To(Succeed(), "Failed to create node") + + Expect(memberClient.Get(ctx, types.NamespacedName{Name: node.Name}, node)).To(Succeed(), "Failed to get node") + node.Status.Allocatable = nodes[idx].Status.Allocatable.DeepCopy() + node.Status.Capacity = nodes[idx].Status.Capacity.DeepCopy() + Expect(memberClient.Status().Update(ctx, node)).To(Succeed(), "Failed to update node status") + } + } + } + shouldDeleteNodes = func(nodes ...corev1.Node) func() { + return func() { + for idx := range nodes { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodes[idx].Name, + }, + } + Expect(memberClient.Delete(ctx, node)).To(SatisfyAny(Succeed(), MatchError(errors.IsNotFound)), "Failed to delete node") + + // Wait for the node to be deleted. + Eventually(nodeDeletedActual(node.Name), eventuallyDuration, eventuallyInterval).Should(BeNil()) + } + } + } + shouldCreatePods = func(pods ...corev1.Pod) func() { + return func() { + for idx := range pods { + pod := pods[idx].DeepCopy() + Expect(memberClient.Create(ctx, pod)).To(Succeed(), "Failed to create pod") + } + } + } + shouldBindPods = func(pods ...corev1.Pod) func() { + return func() { + for idx := range pods { + pod := pods[idx].DeepCopy() + binding := &corev1.Binding{ + Target: corev1.ObjectReference{ + Name: pod.Spec.NodeName, + }, + } + Expect(memberClient.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, pod)).To(Succeed(), "Failed to get pod") + Expect(memberClient.SubResource("binding").Create(ctx, pod, binding)).To(Succeed(), "Failed to bind pod to node") + } + } + } + shouldDeletePods = func(pods ...corev1.Pod) func() { + return func() { + for idx := range pods { + pod := pods[idx].DeepCopy() + Expect(memberClient.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, pod)).To(Succeed(), "Failed to get pod") + + // Transition the pod into a terminal state (if it has not been done). + if pod.Status.Phase != corev1.PodSucceeded && pod.Status.Phase != corev1.PodFailed { + pod.Status.Phase = corev1.PodSucceeded + Expect(memberClient.Status().Update(ctx, pod)).To(Succeed(), "Failed to update pod status") + } + + // Delete the pod. + Expect(memberClient.Delete(ctx, pod)).To(SatisfyAny(Succeed(), MatchError(errors.IsNotFound)), "Failed to delete pod") + + // Wait for the pod to be deleted. + Eventually(podDeletedActual(pod.Namespace, pod.Name), eventuallyDuration, eventuallyInterval).Should(BeNil()) + } + } + + } + shouldReportCorrectPropertiesForNodes = func(nodes []corev1.Node, pods []corev1.Pod) func() { + return func() { + totalCPUCapacity := resource.Quantity{} + allocatableCPUCapacity := resource.Quantity{} + totalMemoryCapacity := resource.Quantity{} + allocatableMemoryCapacity := resource.Quantity{} + + for idx := range nodes { + node := nodes[idx] + totalCPUCapacity.Add(node.Status.Capacity[corev1.ResourceCPU]) + allocatableCPUCapacity.Add(node.Status.Allocatable[corev1.ResourceCPU]) + totalMemoryCapacity.Add(node.Status.Capacity[corev1.ResourceMemory]) + allocatableMemoryCapacity.Add(node.Status.Allocatable[corev1.ResourceMemory]) + } + + totalCPUCores := totalCPUCapacity.AsApproximateFloat64() + totalMemoryBytes := totalMemoryCapacity.AsApproximateFloat64() + totalMemoryGBs := totalMemoryBytes / (1024.0 * 1024.0 * 1024.0) + + requestedCPUCapacity := resource.Quantity{} + requestedMemoryCapacity := resource.Quantity{} + for idx := range pods { + pod := pods[idx] + for cidx := range pod.Spec.Containers { + c := pod.Spec.Containers[cidx] + requestedCPUCapacity.Add(c.Resources.Requests[corev1.ResourceCPU]) + requestedMemoryCapacity.Add(c.Resources.Requests[corev1.ResourceMemory]) + } + } + + availableCPUCapacity := allocatableCPUCapacity.DeepCopy() + availableCPUCapacity.Sub(requestedCPUCapacity) + availableMemoryCapacity := allocatableMemoryCapacity.DeepCopy() + availableMemoryCapacity.Sub(requestedMemoryCapacity) + + Eventually(func() error { + // Calculate the costs manually; hardcoded values cannot be used as Azure pricing + // is subject to periodic change. + + // Note that this is done within an eventually block to ensure that the + // calculation is done using the latest pricing data. Inconsistency + // should seldom occur though. + totalCost := 0.0 + for idx := range nodes { + node := nodes[idx] + cost, found := pp.OnDemandPrice(node.Labels[trackers.AKSClusterNodeSKULabelName]) + if !found { + return fmt.Errorf("on-demand price not found for SKU %s", node.Labels[trackers.AKSClusterNodeSKULabelName]) + } + totalCost += cost + } + perCPUCost := fmt.Sprintf(CostPrecisionTemplate, totalCost/totalCPUCores) + perGBMemoryCost := fmt.Sprintf(CostPrecisionTemplate, totalCost/totalMemoryGBs) + + expectedRes := propertyprovider.PropertyCollectionResponse{ + Properties: map[clusterv1beta1.PropertyName]clusterv1beta1.PropertyValue{ + NodeCountProperty: { + Value: fmt.Sprintf("%d", len(nodes)), + }, + PerCPUCoreCostProperty: { + Value: perCPUCost, + }, + PerGBMemoryCostProperty: { + Value: perGBMemoryCost, + }, + }, + Resources: clusterv1beta1.ResourceUsage{ + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: totalCPUCapacity, + corev1.ResourceMemory: totalMemoryCapacity, + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: allocatableCPUCapacity, + corev1.ResourceMemory: allocatableMemoryCapacity, + }, + Available: corev1.ResourceList{ + corev1.ResourceCPU: availableCPUCapacity, + corev1.ResourceMemory: availableMemoryCapacity, + }, + }, + Conditions: []metav1.Condition{ + { + Type: PropertyCollectionSucceededConditionType, + Status: metav1.ConditionTrue, + Reason: PropertyCollectionSucceededReason, + Message: PropertyCollectionSucceededMessage, + }, + }, + } + + res := p.Collect(ctx) + if diff := cmp.Diff(res, expectedRes, ignoreObservationTimeFieldInPropertyValue); diff != "" { + return fmt.Errorf("property collection response (-got, +want):\n%s", diff) + } + return nil + }, eventuallyDuration, eventuallyInterval).Should(BeNil()) + } + } +) + +var ( + // The nodes below use actual capacities of their respective AKS SKUs; for more information, + // see: + // https://learn.microsoft.com/en-us/azure/virtual-machines/av2-series (for A-series nodes), + // https://learn.microsoft.com/en-us/azure/virtual-machines/sizes-b-series-burstable (for B-series nodes), and + // https://learn.microsoft.com/en-us/azure/virtual-machines/dv2-dsv2-series (for D/DS v2 series nodes). + nodes = []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName1, + Labels: map[string]string{ + trackers.AKSClusterNodeSKULabelName: aksNodeSKU1, + }, + }, + Spec: corev1.NodeSpec{}, + Status: corev1.NodeStatus{ + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("4"), + corev1.ResourceMemory: resource.MustParse("8130080Ki"), + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("3860m"), + corev1.ResourceMemory: resource.MustParse("5474848Ki"), + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName2, + Labels: map[string]string{ + trackers.AKSClusterNodeSKULabelName: aksNodeSKU2, + }, + }, + Spec: corev1.NodeSpec{}, + Status: corev1.NodeStatus{ + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("4"), + corev1.ResourceMemory: resource.MustParse("16374624Ki"), + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("3860m"), + corev1.ResourceMemory: resource.MustParse("12880736Ki"), + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName3, + Labels: map[string]string{ + trackers.AKSClusterNodeSKULabelName: aksNodeSKU3, + }, + }, + Spec: corev1.NodeSpec{}, + Status: corev1.NodeStatus{ + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("7097684Ki"), + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1900m"), + corev1.ResourceMemory: resource.MustParse("4652372Ki"), + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName4, + Labels: map[string]string{ + trackers.AKSClusterNodeSKULabelName: aksNodeSKU3, + }, + }, + Spec: corev1.NodeSpec{}, + Status: corev1.NodeStatus{ + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("7097684Ki"), + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1900m"), + corev1.ResourceMemory: resource.MustParse("4652372Ki"), + }, + }, + }, + } + + pods = []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: podName1, + Namespace: namespaceName1, + }, + Spec: corev1.PodSpec{ + NodeName: nodeName1, + Containers: []corev1.Container{ + { + Name: containerName1, + Image: imageName, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("500Mi"), + }, + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: podName2, + Namespace: namespaceName2, + }, + Spec: corev1.PodSpec{ + NodeName: nodeName2, + Containers: []corev1.Container{ + { + Name: containerName2, + Image: imageName, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1.5"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: podName3, + Namespace: namespaceName3, + }, + Spec: corev1.PodSpec{ + NodeName: nodeName3, + Containers: []corev1.Container{ + { + Name: containerName3, + Image: imageName, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("3Gi"), + }, + }, + }, + { + Name: containerName4, + Image: imageName, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: podName4, + Namespace: namespaceName3, + }, + Spec: corev1.PodSpec{ + NodeName: nodeName4, + Containers: []corev1.Container{ + { + Name: containerName3, + Image: imageName, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("3Gi"), + }, + }, + }, + { + Name: containerName4, + Image: imageName, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + }, + }, + }, + }, + }, + } +) + +// All the test cases in this block are serial + ordered, as manipulation of nodes/pods +// in one test case will disrupt another. +var _ = Describe("aks property provider", func() { + Context("add a new node", Serial, Ordered, func() { + BeforeAll(shouldCreateNodes(nodes[0])) + + It("should report correct properties", shouldReportCorrectPropertiesForNodes(nodes[0:1], nil)) + + AfterAll(shouldDeleteNodes(nodes[0])) + }) + + Context("add multiple nodes", Serial, Ordered, func() { + BeforeAll(shouldCreateNodes(nodes...)) + + It("should report correct properties", shouldReportCorrectPropertiesForNodes(nodes, nil)) + + AfterAll(shouldDeleteNodes(nodes...)) + }) + + Context("remove a node", Serial, Ordered, func() { + BeforeAll(shouldCreateNodes(nodes...)) + + It("should report correct properties", shouldReportCorrectPropertiesForNodes(nodes, nil)) + + It("can delete a node", shouldDeleteNodes(nodes[0])) + + It("should report correct properties after deletion", shouldReportCorrectPropertiesForNodes(nodes[1:], nil)) + + AfterAll(shouldDeleteNodes(nodes[1:]...)) + }) + + Context("remove multiple nodes", Serial, Ordered, func() { + BeforeAll(shouldCreateNodes(nodes...)) + + It("should report correct properties", shouldReportCorrectPropertiesForNodes(nodes, nil)) + + It("can delete multiple nodes", shouldDeleteNodes(nodes[0], nodes[3])) + + It("should report correct properties after deletion", shouldReportCorrectPropertiesForNodes(nodes[1:3], nil)) + + AfterAll(shouldDeleteNodes(nodes[1], nodes[2])) + }) + + Context("add a pod", Serial, Ordered, func() { + BeforeAll(shouldCreateNodes(nodes...)) + + BeforeAll(shouldCreatePods(pods[0])) + + It("should report correct properties (pod bound)", shouldReportCorrectPropertiesForNodes(nodes, pods[0:1])) + + AfterAll(shouldDeletePods(pods[0])) + + AfterAll(shouldDeleteNodes(nodes...)) + }) + + Context("add a pod (not bound)", Serial, Ordered, func() { + BeforeAll(shouldCreateNodes(nodes...)) + + BeforeAll(func() { + pod := pods[0].DeepCopy() + pod.Spec.NodeName = "" + Expect(memberClient.Create(ctx, pod)).To(Succeed(), "Failed to create pod") + }) + + It("should report correct properties (pod not bound)", shouldReportCorrectPropertiesForNodes(nodes, nil)) + + It("can bind the pod", shouldBindPods(pods[0])) + + It("should report correct properties (pod bound)", shouldReportCorrectPropertiesForNodes(nodes, pods[0:1])) + + AfterAll(shouldDeletePods(pods[0])) + + AfterAll(shouldDeleteNodes(nodes...)) + }) + + Context("add multiple pods", Serial, Ordered, func() { + BeforeAll(shouldCreateNodes(nodes...)) + + BeforeAll(shouldCreatePods(pods...)) + + It("should report correct properties (pods bound)", shouldReportCorrectPropertiesForNodes(nodes, pods)) + + AfterAll(shouldDeletePods(pods...)) + + AfterAll(shouldDeleteNodes(nodes...)) + }) + + Context("remove a pod (deleted)", Serial, Ordered, func() { + BeforeAll(shouldCreateNodes(nodes...)) + + BeforeAll(shouldCreatePods(pods[0])) + + It("should report correct properties (pod bound)", shouldReportCorrectPropertiesForNodes(nodes, pods[0:1])) + + It("can delete the pod", shouldDeletePods(pods[0])) + + It("should report correct properties (pod deleted)", shouldReportCorrectPropertiesForNodes(nodes, nil)) + + AfterAll(shouldDeleteNodes(nodes...)) + }) + + Context("remove a pod (succeeded)", Serial, Ordered, func() { + BeforeAll(shouldCreateNodes(nodes...)) + + BeforeAll(shouldCreatePods(pods[0])) + + It("should report correct properties (pod bound)", shouldReportCorrectPropertiesForNodes(nodes, pods[0:1])) + + It("can transition the pod to the succeeded state", func() { + pod := pods[0].DeepCopy() + pod.Status.Phase = corev1.PodSucceeded + Expect(memberClient.Status().Update(ctx, pod)).To(Succeed(), "Failed to update pod status") + }) + + It("should report correct properties (pod succeeded)", shouldReportCorrectPropertiesForNodes(nodes, nil)) + + AfterAll(shouldDeletePods(pods[0])) + + AfterAll(shouldDeleteNodes(nodes...)) + }) + + Context("remove a pod (failed)", Serial, Ordered, func() { + BeforeAll(shouldCreateNodes(nodes...)) + + BeforeAll(shouldCreatePods(pods[0])) + + It("should report correct properties (pod bound)", shouldReportCorrectPropertiesForNodes(nodes, pods[0:1])) + + It("can transition the pod to the failed state", func() { + pod := pods[0].DeepCopy() + pod.Status.Phase = corev1.PodFailed + Expect(memberClient.Status().Update(ctx, pod)).To(Succeed(), "Failed to update pod status") + }) + + It("should report correct properties (pod failed)", shouldReportCorrectPropertiesForNodes(nodes, nil)) + + AfterAll(shouldDeletePods(pods[0])) + + AfterAll(shouldDeleteNodes(nodes...)) + }) + + Context("remove multiple pods", Serial, Ordered, func() { + BeforeAll(shouldCreateNodes(nodes...)) + + BeforeAll(shouldCreatePods(pods...)) + + It("should report correct properties (pods bound)", shouldReportCorrectPropertiesForNodes(nodes, pods)) + + It("can delete multiple pods", shouldDeletePods(pods[1], pods[2])) + + It("should report correct properties (pods deleted)", shouldReportCorrectPropertiesForNodes(nodes, []corev1.Pod{pods[0], pods[3]})) + + AfterAll(shouldDeletePods(pods[0], pods[3])) + + AfterAll(shouldDeleteNodes(nodes...)) + }) +}) diff --git a/pkg/propertyprovider/aks/provider_test.go b/pkg/propertyprovider/aks/provider_test.go new file mode 100644 index 000000000..cdd3bc323 --- /dev/null +++ b/pkg/propertyprovider/aks/provider_test.go @@ -0,0 +1,357 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package aks + +import ( + "context" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1" + "go.goms.io/fleet/pkg/propertyprovider" + "go.goms.io/fleet/pkg/propertyprovider/aks/trackers" +) + +const ( + nodeName1 = "node-1" + nodeName2 = "node-2" + nodeName3 = "node-3" + nodeName4 = "node-4" + + podName1 = "pod-1" + podName2 = "pod-2" + podName3 = "pod-3" + podName4 = "pod-4" + + containerName1 = "container-1" + containerName2 = "container-2" + containerName3 = "container-3" + containerName4 = "container-4" + + namespaceName1 = "work-1" + namespaceName2 = "work-2" + namespaceName3 = "work-3" + + nodeSKU1 = "Standard_1" + nodeSKU2 = "Standard_2" + + imageName = "nginx" +) + +var ( + currentTime = time.Now() +) + +// dummyPricingProvider is a mock implementation that implements the PricingProvider interface. +type dummyPricingProvider struct{} + +var _ trackers.PricingProvider = &dummyPricingProvider{} + +func (d *dummyPricingProvider) OnDemandPrice(_ string) (float64, bool) { + return 1.0, true +} + +func (d *dummyPricingProvider) LastUpdated() time.Time { + return currentTime +} + +func TestCollect(t *testing.T) { + testCases := []struct { + name string + nodes []corev1.Node + pods []corev1.Pod + wantMetricCollectionResponse propertyprovider.PropertyCollectionResponse + }{ + { + name: "can report properties", + nodes: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName1, + Labels: map[string]string{ + trackers.AKSClusterNodeSKULabelName: nodeSKU1, + }, + }, + Spec: corev1.NodeSpec{}, + Status: corev1.NodeStatus{ + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("4"), + corev1.ResourceMemory: resource.MustParse("16Gi"), + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("3.2"), + corev1.ResourceMemory: resource.MustParse("15.2Gi"), + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName2, + Labels: map[string]string{ + trackers.AKSClusterNodeSKULabelName: nodeSKU2, + }, + }, + Spec: corev1.NodeSpec{}, + Status: corev1.NodeStatus{ + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("8"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("7"), + corev1.ResourceMemory: resource.MustParse("30Gi"), + }, + }, + }, + }, + pods: []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: podName1, + Namespace: namespaceName1, + }, + Spec: corev1.PodSpec{ + NodeName: nodeName1, + Containers: []corev1.Container{ + { + Name: containerName1, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + }, + }, + { + Name: containerName2, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1.5"), + corev1.ResourceMemory: resource.MustParse("4Gi"), + }, + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: podName2, + Namespace: namespaceName1, + }, + Spec: corev1.PodSpec{ + NodeName: nodeName2, + Containers: []corev1.Container{ + { + Name: containerName3, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("6"), + corev1.ResourceMemory: resource.MustParse("16Gi"), + }, + }, + }, + }, + }, + }, + }, + wantMetricCollectionResponse: propertyprovider.PropertyCollectionResponse{ + Properties: map[clusterv1beta1.PropertyName]clusterv1beta1.PropertyValue{ + NodeCountProperty: { + Value: "2", + }, + PerCPUCoreCostProperty: { + Value: "0.167", + }, + PerGBMemoryCostProperty: { + Value: "0.042", + }, + }, + Resources: clusterv1beta1.ResourceUsage{ + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("12"), + corev1.ResourceMemory: resource.MustParse("48Gi"), + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("10.2"), + corev1.ResourceMemory: resource.MustParse("45.2Gi"), + }, + Available: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1.7"), + corev1.ResourceMemory: resource.MustParse("23.2Gi"), + }, + }, + Conditions: []metav1.Condition{ + { + Type: PropertyCollectionSucceededConditionType, + Status: metav1.ConditionTrue, + Reason: PropertyCollectionSucceededReason, + Message: PropertyCollectionSucceededMessage, + }, + }, + }, + }, + { + name: "will report zero values if the requested resources exceed the allocatable resources", + nodes: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName1, + Labels: map[string]string{ + trackers.AKSClusterNodeSKULabelName: nodeSKU1, + }, + }, + Spec: corev1.NodeSpec{}, + Status: corev1.NodeStatus{ + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("4"), + corev1.ResourceMemory: resource.MustParse("16Gi"), + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("3.2"), + corev1.ResourceMemory: resource.MustParse("15.2Gi"), + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName2, + Labels: map[string]string{ + trackers.AKSClusterNodeSKULabelName: nodeSKU2, + }, + }, + Spec: corev1.NodeSpec{}, + Status: corev1.NodeStatus{ + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("8"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("7"), + corev1.ResourceMemory: resource.MustParse("30Gi"), + }, + }, + }, + }, + pods: []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: podName1, + Namespace: namespaceName1, + }, + Spec: corev1.PodSpec{ + NodeName: nodeName1, + Containers: []corev1.Container{ + { + Name: containerName1, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("10"), + corev1.ResourceMemory: resource.MustParse("20Gi"), + }, + }, + }, + { + Name: containerName2, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("20"), + corev1.ResourceMemory: resource.MustParse("20Gi"), + }, + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: podName2, + Namespace: namespaceName1, + }, + Spec: corev1.PodSpec{ + NodeName: nodeName2, + Containers: []corev1.Container{ + { + Name: containerName3, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("20"), + corev1.ResourceMemory: resource.MustParse("20Gi"), + }, + }, + }, + }, + }, + }, + }, + wantMetricCollectionResponse: propertyprovider.PropertyCollectionResponse{ + Properties: map[clusterv1beta1.PropertyName]clusterv1beta1.PropertyValue{ + NodeCountProperty: { + Value: "2", + }, + PerCPUCoreCostProperty: { + Value: "0.167", + }, + PerGBMemoryCostProperty: { + Value: "0.042", + }, + }, + Resources: clusterv1beta1.ResourceUsage{ + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("12"), + corev1.ResourceMemory: resource.MustParse("48Gi"), + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("10.2"), + corev1.ResourceMemory: resource.MustParse("45.2Gi"), + }, + Available: corev1.ResourceList{ + corev1.ResourceCPU: resource.Quantity{}, + corev1.ResourceMemory: resource.Quantity{}, + }, + }, + Conditions: []metav1.Condition{ + { + Type: PropertyCollectionSucceededConditionType, + Status: metav1.ConditionTrue, + Reason: PropertyCollectionSucceededReason, + Message: PropertyCollectionSucceededMessage, + }, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + + // Build the trackers manually for testing purposes. + nodeTracker := trackers.NewNodeTracker(&dummyPricingProvider{}) + podTracker := trackers.NewPodTracker() + for idx := range tc.nodes { + nodeTracker.AddOrUpdate(&tc.nodes[idx]) + } + for idx := range tc.pods { + podTracker.AddOrUpdate(&tc.pods[idx]) + } + + p := &PropertyProvider{ + nodeTracker: nodeTracker, + podTracker: podTracker, + } + res := p.Collect(ctx) + if diff := cmp.Diff(res, tc.wantMetricCollectionResponse, ignoreObservationTimeFieldInPropertyValue); diff != "" { + t.Fatalf("Collect() property collection response diff (-got, +want):\n%s", diff) + } + }) + } +} diff --git a/pkg/propertyprovider/aks/suite_test.go b/pkg/propertyprovider/aks/suite_test.go new file mode 100644 index 000000000..0414b8db7 --- /dev/null +++ b/pkg/propertyprovider/aks/suite_test.go @@ -0,0 +1,107 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package aks + +import ( + "context" + "path/filepath" + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + + "go.goms.io/fleet/pkg/propertyprovider" + "go.goms.io/fleet/pkg/propertyprovider/aks/trackers" +) + +const ( + eventuallyDuration = time.Second * 20 + eventuallyInterval = time.Second * 5 +) + +const ( + region = "eastus" + + aksNodeSKU1 = "Standard_B4ms" + aksNodeSKU2 = "Standard_A4_v2" + aksNodeSKU3 = "Standard_DS2_v2" +) + +var ( + memberTestEnv *envtest.Environment + memberClient client.Client + ctx context.Context + cancel context.CancelFunc + p propertyprovider.PropertyProvider + pp trackers.PricingProvider +) + +// setUpResources help set up resources in the test environment. +func setUpResources() { + // Add the namespaces. + namespaceNames := []string{namespaceName1, namespaceName2, namespaceName3} + + for _, name := range namespaceNames { + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } + Expect(memberClient.Create(ctx, ns)).To(Succeed(), "Failed to create namespace") + } +} + +func TestAPIs(t *testing.T) { + RegisterFailHandler(Fail) + + RunSpecs(t, "AKS Fleet Metric Provider Suite") +} + +var _ = BeforeSuite(func() { + klog.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + + ctx, cancel = context.WithCancel(context.TODO()) + + By("Bootstrap the test environment") + + // Start the test cluster. + memberTestEnv = &envtest.Environment{ + CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crd", "bases")}, + ErrorIfCRDPathMissing: true, + } + memberCfg, err := memberTestEnv.Start() + Expect(err).NotTo(HaveOccurred()) + Expect(memberCfg).NotTo(BeNil()) + + // Set up the K8s client for the test cluster. + memberClient, err = client.New(memberCfg, client.Options{Scheme: scheme.Scheme}) + Expect(err).NotTo(HaveOccurred()) + Expect(memberClient).NotTo(BeNil()) + + // Set up resources. + setUpResources() + + // Start the AKS property provider. + pp = trackers.NewAKSKarpenterPricingClient(ctx, region) + p = NewWithPricingProvider(pp) + Expect(p.Start(ctx, memberCfg)).To(Succeed()) +}) + +var _ = AfterSuite(func() { + defer klog.Flush() + cancel() + + By("tearing down the test environment") + Expect(memberTestEnv.Stop()).Should(Succeed()) +}) diff --git a/pkg/propertyprovider/aks/trackers/nodes.go b/pkg/propertyprovider/aks/trackers/nodes.go index a9bff965d..e61ad2f8c 100644 --- a/pkg/propertyprovider/aks/trackers/nodes.go +++ b/pkg/propertyprovider/aks/trackers/nodes.go @@ -438,7 +438,8 @@ func (nt *NodeTracker) TotalCapacity() corev1.ResourceList { nt.mu.RLock() defer nt.mu.RUnlock() - return nt.totalCapacity + // Return a deep copy to avoid leaks and consequent potential data race. + return nt.totalCapacity.DeepCopy() } // TotalAllocatable returns the total allocatable capacity of all resources that @@ -447,7 +448,8 @@ func (nt *NodeTracker) TotalAllocatable() corev1.ResourceList { nt.mu.RLock() defer nt.mu.RUnlock() - return nt.totalAllocatable + // Return a deep copy to avoid leaks and consequent potential data race. + return nt.totalAllocatable.DeepCopy() } // Costs returns the per CPU core and per GB memory costs in the cluster. diff --git a/pkg/propertyprovider/aks/trackers/pods.go b/pkg/propertyprovider/aks/trackers/pods.go index d8152c695..8c60442bc 100644 --- a/pkg/propertyprovider/aks/trackers/pods.go +++ b/pkg/propertyprovider/aks/trackers/pods.go @@ -128,5 +128,6 @@ func (pt *PodTracker) TotalRequested() corev1.ResourceList { pt.mu.RLock() defer pt.mu.RUnlock() - return pt.totalRequested + // Return a deep copy to avoid leaks and consequent potential data race. + return pt.totalRequested.DeepCopy() } diff --git a/pkg/propertyprovider/interface.go b/pkg/propertyprovider/interface.go index 2cd8e5f2f..7c9232e7b 100644 --- a/pkg/propertyprovider/interface.go +++ b/pkg/propertyprovider/interface.go @@ -26,6 +26,9 @@ type PropertyCollectionResponse struct { // available capacity. Resources clusterv1beta1.ResourceUsage // Conditions is an array of conditions that explains the property collection status. + // + // Last transition time of each added condition is omitted if set and will instead be added + // by the Fleet member agent. Conditions []metav1.Condition }