Skip to content

Commit

Permalink
🌱 Replace CAPI's clustercache tracker by the new clustercache (#3236)
Browse files Browse the repository at this point in the history
* Replace CAPI's clustercache tracker by the new clustercache

* Fix govmomi e2e tests

Signed-off-by: Stefan Büringer [email protected]

---------

Signed-off-by: Stefan Büringer [email protected]
Co-authored-by: Stefan Bueringer <[email protected]>
  • Loading branch information
chrischdi and sbueringer authored Nov 5, 2024
1 parent 34d3698 commit 7ac01a6
Show file tree
Hide file tree
Showing 12 changed files with 179 additions and 214 deletions.
4 changes: 0 additions & 4 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,6 @@ issues:
- linters:
- staticcheck
text: "SA1019: \"sigs.k8s.io/cluster-api-provider-vsphere/apis/(v1alpha3|v1alpha4)\" is deprecated: This package will be removed in one of the next releases."
# Deprecations for Cluster API's ClusterCacheTracker
- linters:
- staticcheck
text: "SA1019: remote.(ClusterCacheTracker|ClusterCacheReconciler|ClusterCacheTrackerOptions|NewClusterCacheTracker) is deprecated: This will be removed in Cluster API v1.10, use clustercache.(ClusterCache|SetupWithManager) instead."
# Deprecations for Cluster API's predicates.ClusterUnpausedAndInfrastructureReady
- linters:
- staticcheck
Expand Down
31 changes: 15 additions & 16 deletions controllers/controllers_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/utils/ptr"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/clustercache"
"sigs.k8s.io/cluster-api/controllers/remote"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -54,7 +55,6 @@ func TestControllers(t *testing.T) {

var (
testEnv *helpers.TestEnvironment
tracker *remote.ClusterCacheTracker
ctx = ctrl.SetupSignalHandler()
)

Expand Down Expand Up @@ -82,33 +82,32 @@ func setup() {
panic("unable to create secret caching client")
}

tracker, err = remote.NewClusterCacheTracker(
testEnv.Manager,
remote.ClusterCacheTrackerOptions{
SecretCachingClient: secretCachingClient,
ControllerName: "testenv-manager",
clusterCache, err := clustercache.SetupWithManager(ctx, testEnv.Manager, clustercache.Options{
SecretClient: secretCachingClient,
Client: clustercache.ClientOptions{
UserAgent: remote.DefaultClusterAPIUserAgent("testenv-manager"),
Cache: clustercache.ClientCacheOptions{
DisableFor: []client.Object{
// Don't cache ConfigMaps & Secrets.
&corev1.ConfigMap{},
&corev1.Secret{},
},
},
},
)
}, controller.Options{MaxConcurrentReconciles: 10, SkipNameValidation: ptr.To(true)})
if err != nil {
panic(fmt.Sprintf("unable to setup ClusterCacheTracker: %v", err))
panic(fmt.Sprintf("Unable to setup ClusterCache: %v", err))
}

controllerOpts := controller.Options{MaxConcurrentReconciles: 10, SkipNameValidation: ptr.To(true)}

if err := (&remote.ClusterCacheReconciler{
Client: testEnv.Manager.GetClient(),
Tracker: tracker,
}).SetupWithManager(ctx, testEnv.Manager, controllerOpts); err != nil {
panic(fmt.Sprintf("unable to create ClusterCacheReconciler controller: %v", err))
}

if err := AddClusterControllerToManager(ctx, testEnv.GetControllerManagerContext(), testEnv.Manager, false, controllerOpts); err != nil {
panic(fmt.Sprintf("unable to setup VsphereCluster controller: %v", err))
}
if err := AddMachineControllerToManager(ctx, testEnv.GetControllerManagerContext(), testEnv.Manager, false, controllerOpts); err != nil {
panic(fmt.Sprintf("unable to setup VsphereMachine controller: %v", err))
}
if err := AddVMControllerToManager(ctx, testEnv.GetControllerManagerContext(), testEnv.Manager, tracker, controllerOpts); err != nil {
if err := AddVMControllerToManager(ctx, testEnv.GetControllerManagerContext(), testEnv.Manager, clusterCache, controllerOpts); err != nil {
panic(fmt.Sprintf("unable to setup VsphereVM controller: %v", err))
}
if err := AddVsphereClusterIdentityControllerToManager(ctx, testEnv.GetControllerManagerContext(), testEnv.Manager, controllerOpts); err != nil {
Expand Down
38 changes: 19 additions & 19 deletions controllers/vmware/controllers_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/utils/ptr"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/clustercache"
"sigs.k8s.io/cluster-api/controllers/remote"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -53,9 +54,9 @@ func TestControllers(t *testing.T) {
}

var (
testEnv *helpers.TestEnvironment
tracker *remote.ClusterCacheTracker
ctx = ctrl.SetupSignalHandler()
testEnv *helpers.TestEnvironment
clusterCache clustercache.ClusterCache
ctx = ctrl.SetupSignalHandler()
)

func TestMain(m *testing.M) {
Expand All @@ -82,30 +83,29 @@ func setup() {
panic("unable to create secret caching client")
}

tracker, err = remote.NewClusterCacheTracker(
testEnv.Manager,
remote.ClusterCacheTrackerOptions{
SecretCachingClient: secretCachingClient,
ControllerName: "testenv-manager",
clusterCache, err = clustercache.SetupWithManager(ctx, testEnv.Manager, clustercache.Options{
SecretClient: secretCachingClient,
Client: clustercache.ClientOptions{
UserAgent: remote.DefaultClusterAPIUserAgent("testenv-manager"),
Cache: clustercache.ClientCacheOptions{
DisableFor: []client.Object{
// Don't cache ConfigMaps & Secrets.
&corev1.ConfigMap{},
&corev1.Secret{},
},
},
},
)
}, controller.Options{MaxConcurrentReconciles: 10, SkipNameValidation: ptr.To(true)})
if err != nil {
panic(fmt.Sprintf("unable to setup ClusterCacheTracker: %v", err))
panic(fmt.Sprintf("Unable to setup ClusterCache: %v", err))
}

controllerOpts := controller.Options{MaxConcurrentReconciles: 10, SkipNameValidation: ptr.To(true)}

if err := (&remote.ClusterCacheReconciler{
Client: testEnv.Manager.GetClient(),
Tracker: tracker,
}).SetupWithManager(ctx, testEnv.Manager, controllerOpts); err != nil {
panic(fmt.Sprintf("unable to create ClusterCacheReconciler controller: %v", err))
}

if err := AddServiceAccountProviderControllerToManager(ctx, testEnv.GetControllerManagerContext(), testEnv.Manager, tracker, controllerOpts); err != nil {
if err := AddServiceAccountProviderControllerToManager(ctx, testEnv.GetControllerManagerContext(), testEnv.Manager, clusterCache, controllerOpts); err != nil {
panic(fmt.Sprintf("unable to setup ServiceAccount controller: %v", err))
}
if err := AddServiceDiscoveryControllerToManager(ctx, testEnv.GetControllerManagerContext(), testEnv.Manager, tracker, controllerOpts); err != nil {
if err := AddServiceDiscoveryControllerToManager(ctx, testEnv.GetControllerManagerContext(), testEnv.Manager, clusterCache, controllerOpts); err != nil {
panic(fmt.Sprintf("unable to setup SvcDiscovery controller: %v", err))
}

Expand Down
53 changes: 13 additions & 40 deletions controllers/vmware/serviceaccount_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package vmware
import (
"context"
"fmt"
"reflect"
"strings"
"time"

Expand All @@ -33,7 +32,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/remote"
"sigs.k8s.io/cluster-api/controllers/clustercache"
clusterutilv1 "sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/conditions"
Expand Down Expand Up @@ -64,16 +63,14 @@ const (
)

// AddServiceAccountProviderControllerToManager adds this controller to the provided manager.
func AddServiceAccountProviderControllerToManager(ctx context.Context, controllerManagerCtx *capvcontext.ControllerManagerContext, mgr manager.Manager, tracker *remote.ClusterCacheTracker, options controller.Options) error {
func AddServiceAccountProviderControllerToManager(ctx context.Context, controllerManagerCtx *capvcontext.ControllerManagerContext, mgr manager.Manager, clusterCache clustercache.ClusterCache, options controller.Options) error {
r := &ServiceAccountReconciler{
Client: controllerManagerCtx.Client,
Recorder: mgr.GetEventRecorderFor("providerserviceaccount-controller"),
remoteClusterCacheTracker: tracker,
Client: controllerManagerCtx.Client,
Recorder: mgr.GetEventRecorderFor("providerserviceaccount-controller"),
clusterCache: clusterCache,
}
predicateLog := ctrl.LoggerFrom(ctx).WithValues("controller", "providerserviceaccount")

clusterToInfraFn := clusterToSupervisorInfrastructureMapFunc(ctx, controllerManagerCtx.Client)

// Note: The ProviderServiceAccount reconciler is watching on VSphereCluster in For() instead of
// ProviderServiceAccount because we want to reconcile all ProviderServiceAccounts of a Cluster
// sequentially in a single Reconcile.
Expand All @@ -97,42 +94,18 @@ func AddServiceAccountProviderControllerToManager(ctx context.Context, controlle
// Watches clusters and reconciles the vSphereCluster.
Watches(
&clusterv1.Cluster{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) []reconcile.Request {
requests := clusterToInfraFn(ctx, o)
if len(requests) == 0 {
return nil
}

log := ctrl.LoggerFrom(ctx, "Cluster", klog.KObj(o), "VSphereCluster", klog.KRef(requests[0].Namespace, requests[0].Name))
ctx = ctrl.LoggerInto(ctx, log)

c := &vmwarev1.VSphereCluster{}
if err := r.Client.Get(ctx, requests[0].NamespacedName, c); err != nil {
log.V(4).Error(err, "Failed to get VSphereCluster")
return nil
}

if annotations.IsExternallyManaged(c) {
log.V(6).Info("VSphereCluster is externally managed, will not attempt to map resource")
return nil
}
return requests
}),
handler.EnqueueRequestsFromMapFunc(clusterToSupervisorVSphereClusterFunc(r.Client)),
).
WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(mgr.GetScheme(), predicateLog, controllerManagerCtx.WatchFilterValue)).
WatchesRawSource(r.clusterCache.GetClusterSource("providerserviceaccount", clusterToSupervisorVSphereClusterFunc(r.Client))).
Complete(r)
}

func clusterToSupervisorInfrastructureMapFunc(ctx context.Context, c client.Client) handler.MapFunc {
gvk := vmwarev1.GroupVersion.WithKind(reflect.TypeOf(&vmwarev1.VSphereCluster{}).Elem().Name())
return clusterutilv1.ClusterToInfrastructureMapFunc(ctx, gvk, c, &vmwarev1.VSphereCluster{})
}

// ServiceAccountReconciler reconciles changes to ProviderServiceAccounts.
type ServiceAccountReconciler struct {
Client client.Client
Recorder record.EventRecorder
remoteClusterCacheTracker *remote.ClusterCacheTracker
Client client.Client
Recorder record.EventRecorder
clusterCache clustercache.ClusterCache
}

func (r *ServiceAccountReconciler) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, reterr error) {
Expand Down Expand Up @@ -198,10 +171,10 @@ func (r *ServiceAccountReconciler) Reconcile(ctx context.Context, req reconcile.
// then just return a no-op and wait for the next sync. This will occur when
// the Cluster's status is updated with a reference to the secret that has
// the Kubeconfig data used to access the target cluster.
guestClient, err := r.remoteClusterCacheTracker.GetClient(ctx, client.ObjectKeyFromObject(cluster))
guestClient, err := r.clusterCache.GetClient(ctx, client.ObjectKeyFromObject(cluster))
if err != nil {
if errors.Is(err, remote.ErrClusterLocked) {
log.V(5).Info("Requeuing because another worker has the lock on the ClusterCacheTracker")
if errors.Is(err, clustercache.ErrClusterNotConnected) {
log.V(5).Info("Requeuing because connection to the workload cluster is down")
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
log.Error(err, "The control plane is not ready yet")
Expand Down
4 changes: 3 additions & 1 deletion controllers/vmware/serviceaccount_controller_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,14 @@ var _ = Describe("ProviderServiceAccount controller integration tests", func() {
helpers.CreateAndWait(ctx, intCtx.Client, intCtx.Cluster)
helpers.CreateAndWait(ctx, intCtx.Client, intCtx.VSphereCluster)
helpers.CreateAndWait(ctx, intCtx.Client, intCtx.KubeconfigSecret)
helpers.ClusterInfrastructureReady(ctx, intCtx.Client, clusterCache, intCtx.Cluster)
})

By("Verifying that the guest cluster client works")
var guestClient client.Client
var err error
Eventually(func() error {
guestClient, err = tracker.GetClient(ctx, client.ObjectKeyFromObject(intCtx.Cluster))
guestClient, err = clusterCache.GetClient(ctx, client.ObjectKeyFromObject(intCtx.Cluster))
return err
}, time.Minute, 5*time.Second).Should(Succeed())
// Note: Create a Service informer, so the test later doesn't fail if this doesn't work.
Expand Down Expand Up @@ -191,6 +192,7 @@ var _ = Describe("ProviderServiceAccount controller integration tests", func() {
helpers.CreateAndWait(ctx, intCtx.Client, intCtx.Cluster)
helpers.CreateAndWait(ctx, intCtx.Client, intCtx.VSphereCluster)
helpers.CreateAndWait(ctx, intCtx.Client, intCtx.KubeconfigSecret)
helpers.ClusterInfrastructureReady(ctx, intCtx.Client, clusterCache, intCtx.Cluster)
})
pSvcAccount = getTestProviderServiceAccount(intCtx.Namespace, intCtx.VSphereCluster)
pSvcAccount.Spec.TargetNamespace = "default"
Expand Down
72 changes: 36 additions & 36 deletions controllers/vmware/servicediscovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
bootstrapapi "k8s.io/cluster-bootstrap/token/api"
"k8s.io/klog/v2"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/remote"
"sigs.k8s.io/cluster-api/controllers/clustercache"
clusterutilv1 "sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/conditions"
Expand Down Expand Up @@ -75,11 +75,11 @@ const (
// +kubebuilder:rbac:groups="",resources=configmaps/status,verbs=get

// AddServiceDiscoveryControllerToManager adds the ServiceDiscovery controller to the provided manager.
func AddServiceDiscoveryControllerToManager(ctx context.Context, controllerManagerCtx *capvcontext.ControllerManagerContext, mgr manager.Manager, tracker *remote.ClusterCacheTracker, options controller.Options) error {
func AddServiceDiscoveryControllerToManager(ctx context.Context, controllerManagerCtx *capvcontext.ControllerManagerContext, mgr manager.Manager, clusterCache clustercache.ClusterCache, options controller.Options) error {
r := &serviceDiscoveryReconciler{
Client: controllerManagerCtx.Client,
Recorder: mgr.GetEventRecorderFor("servicediscovery/vspherecluster-controller"),
remoteClusterCacheTracker: tracker,
Client: controllerManagerCtx.Client,
Recorder: mgr.GetEventRecorderFor("servicediscovery/vspherecluster-controller"),
clusterCache: clusterCache,
}
predicateLog := ctrl.LoggerFrom(ctx).WithValues("controller", "servicediscovery/vspherecluster")

Expand All @@ -96,7 +96,6 @@ func AddServiceDiscoveryControllerToManager(ctx context.Context, controllerManag
if err := mgr.Add(configMapCache); err != nil {
return errors.Wrapf(err, "failed to add ConfigMap cache")
}
clusterToInfraFn := clusterToVMwareInfrastructureMapFunc(ctx, controllerManagerCtx)
return ctrl.NewControllerManagedBy(mgr).For(&vmwarev1.VSphereCluster{}).
Named("servicediscovery/vspherecluster").
WithOptions(options).
Expand All @@ -114,37 +113,43 @@ func AddServiceDiscoveryControllerToManager(ctx context.Context, controllerManag
// watch the CAPI cluster
Watches(
&clusterv1.Cluster{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) []reconcile.Request {
requests := clusterToInfraFn(ctx, o)
if len(requests) == 0 {
return nil
}

log := ctrl.LoggerFrom(ctx, "Cluster", klog.KObj(o), "VSphereCluster", klog.KRef(requests[0].Namespace, requests[0].Name))
ctx = ctrl.LoggerInto(ctx, log)

c := &vmwarev1.VSphereCluster{}
if err := r.Client.Get(ctx, requests[0].NamespacedName, c); err != nil {
log.V(4).Error(err, "Failed to get VSphereCluster")
return nil
}

if annotations.IsExternallyManaged(c) {
log.V(6).Info("VSphereCluster is externally managed, will not attempt to map resource")
return nil
}
return requests
}),
handler.EnqueueRequestsFromMapFunc(clusterToSupervisorVSphereClusterFunc(r.Client)),
).
WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(mgr.GetScheme(), predicateLog, controllerManagerCtx.WatchFilterValue)).
WatchesRawSource(r.clusterCache.GetClusterSource("servicediscovery/vspherecluster", clusterToSupervisorVSphereClusterFunc(r.Client))).
Complete(r)
}

func clusterToSupervisorVSphereClusterFunc(ctrlclient client.Client) func(ctx context.Context, obj client.Object) []reconcile.Request {
return func(ctx context.Context, obj client.Object) []reconcile.Request {
gvk := vmwarev1.GroupVersion.WithKind(reflect.TypeOf(&vmwarev1.VSphereCluster{}).Elem().Name())
requests := clusterutilv1.ClusterToInfrastructureMapFunc(ctx, gvk, ctrlclient, &vmwarev1.VSphereCluster{})(ctx, obj)
if len(requests) == 0 {
return nil
}

log := ctrl.LoggerFrom(ctx, "Cluster", klog.KObj(obj), "VSphereCluster", klog.KRef(requests[0].Namespace, requests[0].Name))
ctx = ctrl.LoggerInto(ctx, log)

c := &vmwarev1.VSphereCluster{}
if err := ctrlclient.Get(ctx, requests[0].NamespacedName, c); err != nil {
log.V(4).Error(err, "Failed to get VSphereCluster")
return nil
}

if annotations.IsExternallyManaged(c) {
log.V(6).Info("VSphereCluster is externally managed, will not attempt to map resource")
return nil
}
return requests
}
}

type serviceDiscoveryReconciler struct {
Client client.Client
Recorder record.EventRecorder

remoteClusterCacheTracker *remote.ClusterCacheTracker
clusterCache clustercache.ClusterCache
}

func (r *serviceDiscoveryReconciler) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, reterr error) {
Expand Down Expand Up @@ -201,10 +206,10 @@ func (r *serviceDiscoveryReconciler) Reconcile(ctx context.Context, req reconcil

// We cannot proceed until we are able to access the target cluster. Until
// then just return a no-op and wait for the next sync.
guestClient, err := r.remoteClusterCacheTracker.GetClient(ctx, client.ObjectKeyFromObject(cluster))
guestClient, err := r.clusterCache.GetClient(ctx, client.ObjectKeyFromObject(cluster))
if err != nil {
if errors.Is(err, remote.ErrClusterLocked) {
log.V(5).Info("Requeuing because another worker has the lock on the ClusterCacheTracker")
if errors.Is(err, clustercache.ErrClusterNotConnected) {
log.V(5).Info("Requeuing because connection to the workload cluster is down")
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
log.Error(err, "The control plane is not ready yet")
Expand Down Expand Up @@ -487,8 +492,3 @@ func allClustersRequests(ctx context.Context, c client.Client) []reconcile.Reque
}
return requests
}

func clusterToVMwareInfrastructureMapFunc(ctx context.Context, controllerCtx *capvcontext.ControllerManagerContext) handler.MapFunc {
gvk := vmwarev1.GroupVersion.WithKind(reflect.TypeOf(&vmwarev1.VSphereCluster{}).Elem().Name())
return clusterutilv1.ClusterToInfrastructureMapFunc(ctx, gvk, controllerCtx.Client, &vmwarev1.VSphereCluster{})
}
Loading

0 comments on commit 7ac01a6

Please sign in to comment.