Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"--log-level=3",
"--v=5",
"--enable-operator-policy=true",
"--evaluation-backoff=1",
"--evaluation-backoff=2",
],
"env": {
"WATCH_NAMESPACE": "managed",
Expand All @@ -35,7 +35,7 @@
"--v=5",
"--enable-operator-policy=true",
"--target-kubeconfig-path=${workspaceFolder}/kubeconfig_managed2",
"--evaluation-backoff=1",
"--evaluation-backoff=2",
],
"env": {
"WATCH_NAMESPACE": "managed",
Expand Down
33 changes: 11 additions & 22 deletions controllers/configurationpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"encoding/base64"
"errors"
"fmt"
"math"
"reflect"
"regexp"
"sort"
Expand Down Expand Up @@ -106,6 +105,7 @@ func (r *ConfigurationPolicyReconciler) SetupWithManager(
Named(ControllerName).
WithOptions(controller.Options{
MaxConcurrentReconciles: int(evaluationConcurrency),
RateLimiter: newPolicyRateLimiter(r.EvalBackoffSeconds),
}).
For(&policyv1.ConfigurationPolicy{}, builder.WithPredicates(
predicate.Funcs{
Expand Down Expand Up @@ -449,13 +449,6 @@ func (r *ConfigurationPolicyReconciler) shouldEvaluatePolicy(
}
}

lastEvaluated, err := time.Parse(time.RFC3339, policy.Status.LastEvaluated)
if err != nil {
log.Error(err, "The policy has an invalid status.lastEvaluated value. Will evaluate it now.")

return true, 0
}

usesSelector := policy.Spec.NamespaceSelector.LabelSelector != nil ||
len(policy.Spec.NamespaceSelector.Include) != 0

Expand Down Expand Up @@ -488,20 +481,6 @@ func (r *ConfigurationPolicyReconciler) shouldEvaluatePolicy(
return false, 0

case errors.Is(getIntervalErr, policyv1.ErrIsWatch):
minNextEval := lastEvaluated.Add(time.Second * time.Duration(r.EvalBackoffSeconds))
durationLeft := minNextEval.Sub(now)

if durationLeft > 0 {
log.V(1).Info(
"The policy evaluation is configured for a watch event but rescheduling the evaluation due to the "+
"configured evaluation backoff",
"evaluationBackoffSeconds", r.EvalBackoffSeconds,
"remainingSeconds", math.Round(durationLeft.Seconds()),
)

return false, durationLeft
}

log.V(1).Info("The policy evaluation is configured for a watch event. Will evaluate now.")

return true, 0
Expand All @@ -516,6 +495,16 @@ func (r *ConfigurationPolicyReconciler) shouldEvaluatePolicy(
return true, 0
}

// At this point, we have a valid evaluation interval, we can now determine
// how long we need to wait (if at all).

lastEvaluated, err := time.Parse(time.RFC3339, policy.Status.LastEvaluated)
if err != nil {
log.Error(err, "The policy has an invalid status.lastEvaluated value. Will evaluate it now.")

return true, 0
}

nextEvaluation := lastEvaluated.Add(interval)
durationLeft := nextEvaluation.Sub(now)

Expand Down
4 changes: 4 additions & 0 deletions controllers/operatorpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand Down Expand Up @@ -128,6 +129,9 @@ func (r *OperatorPolicyReconciler) SetupWithManager(
) error {
return ctrl.NewControllerManagedBy(mgr).
Named(OperatorControllerName).
WithOptions(controller.Options{
RateLimiter: newPolicyRateLimiter(2),
}).
For(&policyv1beta1.OperatorPolicy{}, builder.WithPredicates(predicate.Funcs{
// Skip most pure status/metadata updates
UpdateFunc: func(e event.UpdateEvent) bool {
Expand Down
64 changes: 64 additions & 0 deletions controllers/ratelimit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright (c) 2025 Red Hat, Inc.
// Copyright Contributors to the Open Cluster Management project

package controllers

import (
"sync"
"time"

"golang.org/x/time/rate"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

func newPolicyRateLimiter(minimumSecondsPerItem uint32) workqueue.TypedRateLimiter[reconcile.Request] {
return workqueue.NewTypedMaxOfRateLimiter(
// Based on the one in [email protected] DefaultTypedControllerRateLimiter
workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](
500*time.Millisecond, // base delay: 0.5 seconds (5ms in client-go's default)
10*time.Minute, // max delay: 10 minutes (16m40s in client-go's default)
),
// This is an overall (not per-item) limiter with 10 qps, 100 bucket size.
// This is identical to the one in [email protected] DefaultTypedControllerRateLimiter
&workqueue.TypedBucketRateLimiter[reconcile.Request]{
Limiter: rate.NewLimiter(rate.Limit(10), 100),
},
// This limits each item individually, so each has a minimum interval between reconciles.
&PerItemRateLimiter[reconcile.Request]{
limiters: map[reconcile.Request]*rate.Limiter{},
rate: rate.Every(time.Second * time.Duration(minimumSecondsPerItem)),
burst: 1,
},
)
}

type PerItemRateLimiter[T comparable] struct {
lock sync.Mutex
limiters map[T]*rate.Limiter
rate rate.Limit
burst int
}

// Forget is a no-op for a PerItemRateLimiter. RateLimiters in client-go only limit retries on
// failures, but this limiter applies to *all* requests.
func (r *PerItemRateLimiter[T]) Forget(item T) {

Check failure on line 45 in controllers/ratelimit.go

View workflow job for this annotation

GitHub Actions / Preflight Tests

unused-parameter: parameter 'item' seems to be unused, consider removing or renaming it as _ (revive)
}

// NumRequeues always returns 0 for a PerItemRateLimiter.
func (r *PerItemRateLimiter[T]) NumRequeues(item T) int {

Check failure on line 49 in controllers/ratelimit.go

View workflow job for this annotation

GitHub Actions / Preflight Tests

unused-parameter: parameter 'item' seems to be unused, consider removing or renaming it as _ (revive)
return 0
}

func (r *PerItemRateLimiter[T]) When(item T) time.Duration {
r.lock.Lock()
defer r.lock.Unlock()

limiter, ok := r.limiters[item]
if !ok {
limiter = rate.NewLimiter(r.rate, r.burst)
r.limiters[item] = limiter
}

return limiter.Reserve().Delay()
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/stretchr/testify v1.10.0
go.uber.org/zap v1.27.0
golang.org/x/mod v0.24.0
golang.org/x/time v0.11.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.31.9
k8s.io/apiextensions-apiserver v0.31.9
Expand Down Expand Up @@ -101,7 +102,6 @@ require (
golang.org/x/sys v0.33.0 // indirect
golang.org/x/term v0.32.0 // indirect
golang.org/x/text v0.25.0 // indirect
golang.org/x/time v0.11.0 // indirect
golang.org/x/tools v0.33.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.5.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ func parseOpts(flags *pflag.FlagSet, args []string) *ctrlOpts {
flags.Uint32Var(
&opts.evalBackoffSeconds,
"evaluation-backoff",
10,
5,
"The number of seconds before a policy is eligible for reevaluation in watch mode (throttles frequently "+
"evaluated policies)",
)
Expand Down
4 changes: 2 additions & 2 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func TestRunMain(t *testing.T) {
"--leader-elect=false",
fmt.Sprintf("--target-kubeconfig-path=%s", os.Getenv("TARGET_KUBECONFIG_PATH")),
"--log-level=1",
// Speed up the tests by not throttling the policy evaluations
"--evaluation-backoff=1",
// Speed up the tests by not throttling the policy evaluations very much
"--evaluation-backoff=2",
"--enable-operator-policy=true",
)

Expand Down
Loading