Skip to content

Commit

Permalink
implement reconciliation for VirtualMachineSetResourcePolicy
Browse files Browse the repository at this point in the history
  • Loading branch information
chrischdi committed Feb 21, 2025
1 parent a0aeaa7 commit c7718eb
Show file tree
Hide file tree
Showing 9 changed files with 420 additions and 81 deletions.
2 changes: 1 addition & 1 deletion controllers/vmware/test/controllers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ var _ = Describe("Reconciliation tests", func() {
Eventually(func() error {
return k8sClient.Get(ctx, rpKey, resourcePolicy)
}, time.Second*30).Should(Succeed())
Expect(len(resourcePolicy.Spec.ClusterModuleGroups)).To(BeEquivalentTo(2))
Expect(len(resourcePolicy.Spec.ClusterModuleGroups)).To(BeEquivalentTo(1))

By("Create the CAPI Machine and wait for it to exist")
machineKey, machine := deployCAPIMachine(ns.Name, cluster, k8sClient)
Expand Down
32 changes: 31 additions & 1 deletion controllers/vmware/vspherecluster_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type ClusterReconciler struct {
// +kubebuilder:rbac:groups=netoperator.vmware.com,resources=networks,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;watch;update;create;delete
// +kubebuilder:rbac:groups="",resources=persistentvolumeclaims/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=machinedeployments,verbs=get;list;watch
// +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=machines,verbs=get;list;watch

func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, reterr error) {
log := ctrl.LoggerFrom(ctx)
Expand Down Expand Up @@ -172,7 +174,7 @@ func (r *ClusterReconciler) reconcileNormal(ctx context.Context, clusterCtx *vmw
// Reconcile ResourcePolicy before we create the machines. If the ResourcePolicy is not reconciled before we create the Node VMs,
// it will be handled by vm operator by relocating the VMs to the ResourcePool and Folder specified by the ResourcePolicy.
// Reconciling the ResourcePolicy early potentially saves us the extra relocate operation.
resourcePolicyName, err := r.ResourcePolicyService.ReconcileResourcePolicy(ctx, clusterCtx)
resourcePolicyName, err := r.ResourcePolicyService.ReconcileResourcePolicy(ctx, clusterCtx.Cluster, clusterCtx.VSphereCluster)
if err != nil {
conditions.MarkFalse(clusterCtx.VSphereCluster, vmwarev1.ResourcePolicyReadyCondition, vmwarev1.ResourcePolicyCreationFailedReason, clusterv1.ConditionSeverityWarning, err.Error())
return errors.Wrapf(err,
Expand Down Expand Up @@ -370,6 +372,34 @@ func (r *ClusterReconciler) VSphereMachineToCluster(ctx context.Context, o clien
}}
}

// MachineDeploymentToCluster adds reconcile requests for a Cluster when one of its machineDeployments has an event.
func (r *ClusterReconciler) MachineDeploymentToCluster(ctx context.Context, o client.Object) []reconcile.Request {
log := ctrl.LoggerFrom(ctx)

machineDeployment, ok := o.(*clusterv1.MachineDeployment)
if !ok {
log.Error(nil, fmt.Sprintf("Expected a MachineDeployment but got a %T", o))
return nil
}
log = log.WithValues("MachineDeployment", klog.KObj(machineDeployment))
ctx = ctrl.LoggerInto(ctx, log)

vsphereCluster, err := util.GetVMwareVSphereClusterFromMachineDeployment(ctx, r.Client, machineDeployment)
if err != nil {
log.V(4).Error(err, "Failed to get VSphereCluster from MachineDeployment")
return nil
}

// Can add further filters on Cluster state so that we don't keep reconciling Cluster
log.V(6).Info("Triggering VSphereCluster reconcile from MachineDeployment")
return []ctrl.Request{{
NamespacedName: types.NamespacedName{
Namespace: vsphereCluster.Namespace,
Name: vsphereCluster.Name,
},
}}
}

// ZoneToVSphereClusters adds reconcile requests for VSphereClusters when Zone has an event.
func (r *ClusterReconciler) ZoneToVSphereClusters(ctx context.Context, o client.Object) []reconcile.Request {
log := ctrl.LoggerFrom(ctx)
Expand Down
4 changes: 4 additions & 0 deletions controllers/vspherecluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func AddClusterControllerToManager(ctx context.Context, controllerManagerCtx *ca
&vmwarev1.VSphereMachine{},
handler.EnqueueRequestsFromMapFunc(reconciler.VSphereMachineToCluster),
).
Watches(
&clusterv1.MachineDeployment{},
handler.EnqueueRequestsFromMapFunc(reconciler.MachineDeploymentToCluster),
).
WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(mgr.GetScheme(), predicateLog, controllerManagerCtx.WatchFilterValue))

// Conditionally add a Watch for topologyv1.Zone when the feature gate is enabled
Expand Down
3 changes: 2 additions & 1 deletion pkg/services/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

infrav1 "sigs.k8s.io/cluster-api-provider-vsphere/apis/v1beta1"
vmwarev1 "sigs.k8s.io/cluster-api-provider-vsphere/apis/vmware/v1beta1"
capvcontext "sigs.k8s.io/cluster-api-provider-vsphere/pkg/context"
"sigs.k8s.io/cluster-api-provider-vsphere/pkg/context/vmware"
)
Expand Down Expand Up @@ -63,7 +64,7 @@ type ControlPlaneEndpointService interface {
type ResourcePolicyService interface {
// ReconcileResourcePolicy ensures that a VirtualMachineSetResourcePolicy exists for the cluster
// Returns the name of a policy if it exists, otherwise returns an error
ReconcileResourcePolicy(ctx context.Context, clusterCtx *vmware.ClusterContext) (string, error)
ReconcileResourcePolicy(ctx context.Context, cluster *clusterv1.Cluster, vSphereCluster *vmwarev1.VSphereCluster) (string, error)
}

// NetworkProvider provision network resources and configures VM based on network type.
Expand Down
2 changes: 2 additions & 0 deletions pkg/services/vmoperator/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ const (

// ControlPlaneVMClusterModuleGroupName is the name used for the control plane Cluster Module.
ControlPlaneVMClusterModuleGroupName = "control-plane-group"
// ClusterWorkerVMClusterModuleGroupName is the name used for the worker Cluster Module when using mode Cluster.
ClusterWorkerVMClusterModuleGroupName = "workers-group"
// ClusterModuleNameAnnotationKey is key for the Cluster Module annotation.
ClusterModuleNameAnnotationKey = "vsphere-cluster-module-group"
// ProviderTagsAnnotationKey is the key used for the provider tags annotation.
Expand Down
207 changes: 158 additions & 49 deletions pkg/services/vmoperator/resource_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,22 @@ package vmoperator

import (
"context"
"fmt"
"sort"

"github.com/pkg/errors"
vmoprv1 "github.com/vmware-tanzu/vm-operator/api/v1alpha2"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/controller-runtime/pkg/client"
ctrlutil "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"sigs.k8s.io/cluster-api-provider-vsphere/pkg/context/vmware"
vmwarev1 "sigs.k8s.io/cluster-api-provider-vsphere/apis/vmware/v1beta1"
"sigs.k8s.io/cluster-api-provider-vsphere/feature"
)

// RPService represents the ability to reconcile a VirtualMachineSetResourcePolicy via vmoperator.
Expand All @@ -36,73 +43,175 @@ type RPService struct {

// ReconcileResourcePolicy ensures that a VirtualMachineSetResourcePolicy exists for the cluster
// Returns the name of a policy if it exists, otherwise returns an error.
func (s *RPService) ReconcileResourcePolicy(ctx context.Context, clusterCtx *vmware.ClusterContext) (string, error) {
resourcePolicy, err := s.getVirtualMachineSetResourcePolicy(ctx, clusterCtx)
func (s *RPService) ReconcileResourcePolicy(ctx context.Context, cluster *clusterv1.Cluster, vSphereCluster *vmwarev1.VSphereCluster) (string, error) {
clusterModuleGroups, err := getTargetClusterModuleGroups(ctx, s.Client, cluster, vSphereCluster)
if err != nil {
return "", err
}

resourcePolicyName := cluster.Name
resourcePolicy := &vmoprv1.VirtualMachineSetResourcePolicy{}

if err := s.Client.Get(ctx, client.ObjectKey{Namespace: cluster.Namespace, Name: resourcePolicyName}, resourcePolicy); err != nil {
if !apierrors.IsNotFound(err) {
return "", errors.Errorf("unexpected error in getting the Resource policy: %+v", err)
return "", errors.Wrap(err, "failed to get existing VirtualMachineSetResourcePolicy")
}
resourcePolicy, err = s.createVirtualMachineSetResourcePolicy(ctx, clusterCtx)
if err != nil {
return "", errors.Errorf("failed to create Resource Policy: %+v", err)

resourcePolicy = &vmoprv1.VirtualMachineSetResourcePolicy{
ObjectMeta: metav1.ObjectMeta{
Namespace: cluster.Namespace,
Name: resourcePolicyName,
},
}

if err := s.mutateResourcePolicy(resourcePolicy, clusterModuleGroups, cluster, vSphereCluster, true); err != nil {
return "", errors.Wrap(err, "failed to mutate VirtualMachineSetResourcePolicy")
}

if err := s.Client.Create(ctx, resourcePolicy); err != nil {
return "", errors.Wrap(err, "failed to create VirtualMachineSetResourcePolicy")
}

return resourcePolicyName, nil
}

// Ensure .spec.clusterModuleGroups is up to date.
helper, err := patch.NewHelper(resourcePolicy, s.Client)
if err != nil {
return "", err
}

if err := s.mutateResourcePolicy(resourcePolicy, clusterModuleGroups, cluster, vSphereCluster, false); err != nil {
return "", errors.Wrap(err, "failed to mutate VirtualMachineSetResourcePolicy")
}

resourcePolicy.Spec.ClusterModuleGroups = clusterModuleGroups
if err := helper.Patch(ctx, resourcePolicy); err != nil {
return "", err
}

return resourcePolicy.Name, nil
return resourcePolicyName, nil
}

func (s *RPService) newVirtualMachineSetResourcePolicy(clusterCtx *vmware.ClusterContext) *vmoprv1.VirtualMachineSetResourcePolicy {
return &vmoprv1.VirtualMachineSetResourcePolicy{
ObjectMeta: metav1.ObjectMeta{
Namespace: clusterCtx.Cluster.Namespace,
Name: clusterCtx.Cluster.Name,
},
func (s *RPService) mutateResourcePolicy(resourcePolicy *vmoprv1.VirtualMachineSetResourcePolicy, clusterModuleGroups []string, cluster *clusterv1.Cluster, vSphereCluster *vmwarev1.VSphereCluster, isCreate bool) error {
// Always ensure the owner reference
if err := ctrlutil.SetOwnerReference(vSphereCluster, resourcePolicy, s.Client.Scheme()); err != nil {
return errors.Wrapf(err, "failed to set owner reference for virtualMachineSetResourcePolicy %s for cluster %s", klog.KObj(resourcePolicy), klog.KObj(vSphereCluster))
}

// Always ensure the clusterModuleGroups are up-to-date.
resourcePolicy.Spec.ClusterModuleGroups = clusterModuleGroups

// On create: Also set resourcePool and folder
if isCreate {
resourcePolicy.Spec.Folder = cluster.Name
resourcePolicy.Spec.ResourcePool = vmoprv1.ResourcePoolSpec{
Name: cluster.Name,
}
}

return nil
}

func (s *RPService) getVirtualMachineSetResourcePolicy(ctx context.Context, clusterCtx *vmware.ClusterContext) (*vmoprv1.VirtualMachineSetResourcePolicy, error) {
func getVirtualMachineSetResourcePolicy(ctx context.Context, ctrlClient client.Client, cluster *clusterv1.Cluster) (*vmoprv1.VirtualMachineSetResourcePolicy, error) {
vmResourcePolicy := &vmoprv1.VirtualMachineSetResourcePolicy{}
vmResourcePolicyName := client.ObjectKey{
Namespace: clusterCtx.Cluster.Namespace,
Name: clusterCtx.Cluster.Name,
Namespace: cluster.Namespace,
Name: cluster.Name,
}
err := s.Client.Get(ctx, vmResourcePolicyName, vmResourcePolicy)
return vmResourcePolicy, err
if err := ctrlClient.Get(ctx, vmResourcePolicyName, vmResourcePolicy); err != nil {
return nil, err
}

return vmResourcePolicy, nil
}

func (s *RPService) createVirtualMachineSetResourcePolicy(ctx context.Context, clusterCtx *vmware.ClusterContext) (*vmoprv1.VirtualMachineSetResourcePolicy, error) {
vmResourcePolicy := s.newVirtualMachineSetResourcePolicy(clusterCtx)
func getFallbackWorkerClusterModuleGroupName(clusterName string) string {
return fmt.Sprintf("%s-workers-0", clusterName)
}

_, err := ctrlutil.CreateOrPatch(ctx, s.Client, vmResourcePolicy, func() error {
vmResourcePolicy.Spec = vmoprv1.VirtualMachineSetResourcePolicySpec{
ResourcePool: vmoprv1.ResourcePoolSpec{
Name: clusterCtx.Cluster.Name,
},
Folder: clusterCtx.Cluster.Name,
ClusterModuleGroups: []string{
ControlPlaneVMClusterModuleGroupName,
getMachineDeploymentNameForCluster(clusterCtx.Cluster),
},
}
// Ensure that the VirtualMachineSetResourcePolicy is owned by the VSphereCluster
if err := ctrlutil.SetOwnerReference(
clusterCtx.VSphereCluster,
vmResourcePolicy,
s.Client.Scheme(),
); err != nil {
return errors.Wrapf(
err,
"error setting %s/%s as owner of %s/%s",
clusterCtx.VSphereCluster.Namespace,
clusterCtx.VSphereCluster.Name,
vmResourcePolicy.Namespace,
vmResourcePolicy.Name,
)
func getWorkerAntiAffinityMode(vSphereCluster *vmwarev1.VSphereCluster) vmwarev1.VSphereClusterWorkerAntiAffinityMode {
if vSphereCluster.Spec.Placement == nil || vSphereCluster.Spec.Placement.WorkerAntiAffinity == nil {
return vmwarev1.VSphereClusterWorkerAntiAffinityModeCluster
}

return vSphereCluster.Spec.Placement.WorkerAntiAffinity.Mode
}

func getTargetClusterModuleGroups(ctx context.Context, ctrlClient client.Client, cluster *clusterv1.Cluster, vSphereCluster *vmwarev1.VSphereCluster) ([]string, error) {
if !feature.Gates.Enabled(feature.WorkerAntiAffinity) {
// Fallback to old behaviour
return []string{
ControlPlaneVMClusterModuleGroupName,
getFallbackWorkerClusterModuleGroupName(cluster.Name),
}, nil
}
// Always add a cluster module for control plane machines.
modules := []string{
ControlPlaneVMClusterModuleGroupName,
}

switch mode := getWorkerAntiAffinityMode(vSphereCluster); mode {
case vmwarev1.VSphereClusterWorkerAntiAffinityModeNone:
// Only configure a cluster module for control-plane nodes
case vmwarev1.VSphereClusterWorkerAntiAffinityModeCluster:
// Add an additional cluster module for workers when using Cluster mode.
modules = append(modules, ClusterWorkerVMClusterModuleGroupName)
case vmwarev1.VSphereClusterWorkerAntiAffinityModeMachineDeployment:
// Add an additional cluster module for each MachineDeployment workers when using MachineDeployment mode.
machineDeploymentNames, err := getMachineDeploymentNamesForCluster(ctx, ctrlClient, cluster)
if err != nil {
return nil, err
}
return nil
})

modules = append(modules, machineDeploymentNames...)
default:
return nil, errors.Errorf("unknown mode %q configured for WorkerAntiAffinity", mode)
}

// Add cluster modules from existing VirtualMachines and deduplicate with the target ones.
existingModules, err := getVirtualMachineClusterModulesForCluster(ctx, ctrlClient, cluster)
if err != nil {
return nil, err
}
return vmResourcePolicy, nil
modules = existingModules.Insert(modules...).UnsortedList()

// Sort elements to have deterministic output.
sort.Strings(modules)

return modules, nil
}

func getVirtualMachineClusterModulesForCluster(ctx context.Context, ctrlClient client.Client, cluster *clusterv1.Cluster) (sets.Set[string], error) {
labels := map[string]string{clusterv1.ClusterNameLabel: cluster.GetName()}
virtualMachineList := &vmoprv1.VirtualMachineList{}
if err := ctrlClient.List(
ctx, virtualMachineList,
client.InNamespace(cluster.GetNamespace()),
client.MatchingLabels(labels)); err != nil {
return nil, errors.Wrapf(err, "failed to list MachineDeployment objects")
}

clusterModules := sets.Set[string]{}
for _, virtualMachine := range virtualMachineList.Items {
if clusterModule, ok := virtualMachine.Annotations[ClusterModuleNameAnnotationKey]; ok {
clusterModules = clusterModules.Insert(clusterModule)
}
}
return clusterModules, nil
}

func checkClusterModuleGroup(ctx context.Context, ctrlClient client.Client, cluster *clusterv1.Cluster, clusterModuleGroupName string) error {
resourcePolicy, err := getVirtualMachineSetResourcePolicy(ctx, ctrlClient, cluster)
if err != nil {
return err
}

for _, cm := range resourcePolicy.Status.ClusterModules {
if cm.GroupName == clusterModuleGroupName {
return nil
}
}

return errors.Errorf("VirtualMachineSetResourcePolicy's .status.clusterModules does not yet contain group %q", clusterModuleGroupName)
}
Loading

0 comments on commit c7718eb

Please sign in to comment.