Skip to content

Commit

Permalink
feat: add binding watcher (Azure#728)
Browse files Browse the repository at this point in the history
  • Loading branch information
Arvindthiru authored Mar 28, 2024
1 parent cf5ad6b commit 1f4d908
Show file tree
Hide file tree
Showing 8 changed files with 690 additions and 277 deletions.
105 changes: 105 additions & 0 deletions pkg/controllers/clusterresourcebindingwatcher/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
*/
package clusterresourcebindingwatcher

import (
"context"
"flag"
"path/filepath"
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"k8s.io/klog/v2/textlogger"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics/server"

fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
"go.goms.io/fleet/test/utils/controller"
)

var (
cfg *rest.Config
mgr manager.Manager
k8sClient client.Client
testEnv *envtest.Environment
ctx context.Context
cancel context.CancelFunc
fakePlacementController *controller.FakeController
)

func TestAPIs(t *testing.T) {
RegisterFailHandler(Fail)

RunSpecs(t, "ClusterResourceBinding Watcher Suite")
}

var _ = BeforeSuite(func() {
klog.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))

ctx, cancel = context.WithCancel(context.TODO())

By("bootstrapping test environment")
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("../../../", "config", "crd", "bases")},
ErrorIfCRDPathMissing: true,
}

var err error
cfg, err = testEnv.Start()
Expect(err).Should(Succeed())
Expect(cfg).NotTo(BeNil())

err = fleetv1beta1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())

By("construct the k8s client")
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
Expect(err).Should(Succeed())
Expect(k8sClient).NotTo(BeNil())

By("starting the controller manager")
klog.InitFlags(flag.CommandLine)
flag.Parse()

mgr, err = ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme.Scheme,
Metrics: server.Options{
BindAddress: "0",
},
Logger: textlogger.NewLogger(textlogger.NewConfig(textlogger.Verbosity(4))),
})
Expect(err).Should(Succeed())

fakePlacementController = &controller.FakeController{}

err = (&Reconciler{
Client: mgr.GetClient(),
PlacementController: fakePlacementController,
}).SetupWithManager(mgr)
Expect(err).Should(Succeed())

go func() {
defer GinkgoRecover()
err = mgr.Start(ctx)
Expect(err).Should(Succeed(), "failed to run manager")
}()
})

var _ = AfterSuite(func() {
defer klog.Flush()

cancel()
By("tearing down the test environment")
err := testEnv.Stop()
Expect(err).Should(Succeed())
})
122 changes: 122 additions & 0 deletions pkg/controllers/clusterresourcebindingwatcher/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
*/

// Package clusterresourcebindingwatcher features a controller to watch the clusterResourceBinding changes.
package clusterresourcebindingwatcher

import (
"context"
"fmt"
"time"

"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
"go.goms.io/fleet/pkg/utils/condition"
"go.goms.io/fleet/pkg/utils/controller"
)

// Reconciler reconciles updates to clusterResourceBinding.
type Reconciler struct {
// Client is the client the controller uses to access the hub cluster.
client.Client
// PlacementController maintains a rate limited queue which used to store
// the name of the clusterResourcePlacement and a reconcile function to consume the items in queue.
PlacementController controller.Controller
}

// Reconcile reconciles the clusterResourceBinding.
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
bindingRef := klog.KRef("", req.Name)

startTime := time.Now()
klog.V(2).InfoS("ClusterResourceBindingWatcher Reconciliation starts", "clusterResourceBinding", bindingRef)
defer func() {
latency := time.Since(startTime).Milliseconds()
klog.V(2).InfoS("ClusterResourceBindingWatcher Reconciliation ends", "clusterResourceBinding", bindingRef, "latency", latency)
}()

var binding fleetv1beta1.ClusterResourceBinding
if err := r.Client.Get(ctx, req.NamespacedName, &binding); err != nil {
klog.ErrorS(err, "Failed to get cluster resource binding", "clusterResourceBinding", bindingRef)
return ctrl.Result{}, controller.NewAPIServerError(true, client.IgnoreNotFound(err))
}

// Check if the cluster resource binding has been deleted.
// Normally this would not happen as the event filter is set to filter out all deletion events.
if binding.DeletionTimestamp != nil {
// The cluster resource binding has been deleted; ignore it.
return ctrl.Result{}, nil
}

// Fetch the CRP name from the CRPTrackingLabel on ClusterResourceBinding.
crpName := binding.Labels[fleetv1beta1.CRPTrackingLabel]
if len(crpName) == 0 {
// The CRPTrackingLabel label is not present; normally this should never occur.
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("CRPTrackingLabel is missing or value is empty")),
"CRPTrackingLabel is not present",
"clusterResourceBinding", bindingRef)
// This is not a situation that the controller can recover by itself. Should the label
// value be corrected, the controller will be triggered again.
return ctrl.Result{}, nil
}

// Enqueue the CRP name for reconciling.
r.PlacementController.Enqueue(crpName)
return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the manager.
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
customPredicate := predicate.Funcs{
// Ignoring creation and deletion events because the clusterSchedulingPolicySnapshot status is updated when bindings are create/deleted clusterSchedulingPolicySnapshot
// controller enqueues the CRP name for reconciling whenever clusterSchedulingPolicySnapshot is updated.
CreateFunc: func(e event.CreateEvent) bool {
// Ignore creation events.
return false
},
DeleteFunc: func(e event.DeleteEvent) bool {
// Ignore deletion events.
return false
},
UpdateFunc: func(e event.UpdateEvent) bool {
// Check if the update event is valid.
if e.ObjectOld == nil || e.ObjectNew == nil {
err := controller.NewUnexpectedBehaviorError(fmt.Errorf("update event is invalid"))
klog.ErrorS(err, "Failed to process update event")
return false
}
oldBinding, oldOk := e.ObjectOld.(*fleetv1beta1.ClusterResourceBinding)
newBinding, newOk := e.ObjectNew.(*fleetv1beta1.ClusterResourceBinding)
if !oldOk || !newOk {
err := controller.NewUnexpectedBehaviorError(fmt.Errorf("failed to cast runtime objects in update event to cluster resource binding objects"))
klog.ErrorS(err, "Failed to process update event")
return false
}
return areConditionsUpdated(oldBinding, newBinding)
},
}

return ctrl.NewControllerManagedBy(mgr).
For(&fleetv1beta1.ClusterResourceBinding{}).
WithEventFilter(customPredicate).
Complete(r)
}

func areConditionsUpdated(oldBinding, newBinding *fleetv1beta1.ClusterResourceBinding) bool {
for i := condition.RolloutStartedCondition; i < condition.TotalCondition; i++ {
oldCond := oldBinding.GetCondition(string(i.ResourceBindingConditionType()))
newCond := newBinding.GetCondition(string(i.ResourceBindingConditionType()))
// oldCond.ObservedGeneration will always be less than or equal to newCond.ObservedGeneration.
if !condition.EqualCondition(oldCond, newCond) {
return true
}
}
return false
}
Loading

0 comments on commit 1f4d908

Please sign in to comment.