Skip to content

Block DisruptionCrons at ValidateCreate if their target does not exist #957

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2025 Datadog, Inc.

package controllers
package v1beta1

import (
"context"
Expand All @@ -12,8 +12,8 @@ import (
"sort"
"time"

chaosv1beta1 "github.com/DataDog/chaos-controller/api/v1beta1"
cLog "github.com/DataDog/chaos-controller/log"

"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -26,14 +26,14 @@ import (
)

const (
DisruptionCronNameLabel = chaosv1beta1.GroupName + "/disruption-cron-name"
DisruptionRolloutNameLabel = chaosv1beta1.GroupName + "/disruption-rollout-name"
DisruptionCronNameLabel = GroupName + "/disruption-cron-name"
DisruptionRolloutNameLabel = GroupName + "/disruption-rollout-name"
)

// GetChildDisruptions retrieves disruptions associated with a resource by its label.
// Most of the time, this will return an empty list as disruptions are typically short-lived objects.
func GetChildDisruptions(ctx context.Context, cl client.Client, log *zap.SugaredLogger, namespace, labelKey, labelVal string) (*chaosv1beta1.DisruptionList, error) {
disruptions := &chaosv1beta1.DisruptionList{}
func GetChildDisruptions(ctx context.Context, cl client.Client, log *zap.SugaredLogger, namespace, labelKey, labelVal string) (*DisruptionList, error) {
disruptions := &DisruptionList{}
labelSelector := labels.SelectorFromSet(labels.Set{labelKey: labelVal})

if err := cl.List(ctx, disruptions, client.InNamespace(namespace), &client.ListOptions{LabelSelector: labelSelector}); err != nil {
Expand All @@ -46,7 +46,7 @@ func GetChildDisruptions(ctx context.Context, cl client.Client, log *zap.Sugared

// GetTargetResource retrieves the specified target resource (Deployment or StatefulSet).
// It returns the target resource object and any error encountered during retrieval.
func GetTargetResource(ctx context.Context, cl client.Client, targetResource *chaosv1beta1.TargetResourceSpec, namespace string) (client.Object, error) {
func GetTargetResource(ctx context.Context, cl client.Client, targetResource *TargetResourceSpec, namespace string) (client.Object, error) {
var targetObj client.Object

switch targetResource.Kind {
Expand All @@ -66,7 +66,7 @@ func GetTargetResource(ctx context.Context, cl client.Client, targetResource *ch

// CheckTargetResourceExists determines if the target resource exists.
// Returns a boolean indicating presence and an error if one occurs.
func CheckTargetResourceExists(ctx context.Context, cl client.Client, targetResource *chaosv1beta1.TargetResourceSpec, namespace string) (bool, error) {
func CheckTargetResourceExists(ctx context.Context, cl client.Client, targetResource *TargetResourceSpec, namespace string) (bool, error) {
_, err := GetTargetResource(ctx, cl, targetResource, namespace)

if apierrors.IsNotFound(err) {
Expand All @@ -80,7 +80,7 @@ func CheckTargetResourceExists(ctx context.Context, cl client.Client, targetReso

// GetSelectors retrieves the labels of the specified target resource (Deployment or StatefulSet).
// Returns a set of labels to be used as Disruption selectors and an error if retrieval fails.
func GetSelectors(ctx context.Context, cl client.Client, targetResource *chaosv1beta1.TargetResourceSpec, namespace string) (labels *metav1.LabelSelector, err error) {
func GetSelectors(ctx context.Context, cl client.Client, targetResource *TargetResourceSpec, namespace string) (labels *metav1.LabelSelector, err error) {
targetObj, err := GetTargetResource(ctx, cl, targetResource, namespace)
if err != nil {
return nil, err
Expand All @@ -105,10 +105,10 @@ func GetSelectors(ctx context.Context, cl client.Client, targetResource *chaosv1

// createBaseDisruption generates a basic Disruption object using the provided owner and disruptionSpec.
// The returned Disruption object has its basic details set, but it's not saved or stored anywhere yet.
func createBaseDisruption(owner metav1.Object, disruptionSpec *chaosv1beta1.DisruptionSpec) *chaosv1beta1.Disruption {
func createBaseDisruption(owner metav1.Object, disruptionSpec *DisruptionSpec) *Disruption {
name := generateDisruptionName(owner)

return &chaosv1beta1.Disruption{
return &Disruption{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: owner.GetNamespace(),
Expand All @@ -122,7 +122,7 @@ func createBaseDisruption(owner metav1.Object, disruptionSpec *chaosv1beta1.Disr
// setDisruptionAnnotations updates the annotations of a given Disruption object with those of its owner.
// It sets a scheduled time annotation using the provided scheduledTime.
// It parses the UserInfo annotation if it exists and sets user-related annotations.
func setDisruptionAnnotations(disruption *chaosv1beta1.Disruption, owner metav1.Object, scheduledTime time.Time) error {
func setDisruptionAnnotations(disruption *Disruption, owner metav1.Object, scheduledTime time.Time) error {
disruption.CopyOwnerAnnotations(owner)

disruption.SetScheduledAtAnnotation(scheduledTime)
Expand All @@ -132,7 +132,7 @@ func setDisruptionAnnotations(disruption *chaosv1beta1.Disruption, owner metav1.

// overwriteDisruptionSelectors updates the selectors of a given Disruption object based on the provided targetResource.
// Returns an error if fetching selectors from the target resource fails.
func overwriteDisruptionSelectors(ctx context.Context, cl client.Client, disruption *chaosv1beta1.Disruption, targetResource *chaosv1beta1.TargetResourceSpec, namespace string) error {
func overwriteDisruptionSelectors(ctx context.Context, cl client.Client, disruption *Disruption, targetResource *TargetResourceSpec, namespace string) error {
// Get selectors from target resource
selectors, err := GetSelectors(ctx, cl, targetResource, namespace)
if err != nil {
Expand All @@ -157,7 +157,7 @@ func overwriteDisruptionSelectors(ctx context.Context, cl client.Client, disrupt
// CreateDisruptionFromTemplate constructs a Disruption object based on the provided owner, disruptionSpec, and targetResource.
// The function sets annotations, overwrites selectors, and associates the Disruption with its owner.
// It returns the constructed Disruption or an error if any step fails.
func CreateDisruptionFromTemplate(ctx context.Context, cl client.Client, scheme *runtime.Scheme, owner metav1.Object, targetResource *chaosv1beta1.TargetResourceSpec, disruptionSpec *chaosv1beta1.DisruptionSpec, scheduledTime time.Time, log *zap.SugaredLogger) (*chaosv1beta1.Disruption, error) {
func CreateDisruptionFromTemplate(ctx context.Context, cl client.Client, scheme *runtime.Scheme, owner metav1.Object, targetResource *TargetResourceSpec, disruptionSpec *DisruptionSpec, scheduledTime time.Time, log *zap.SugaredLogger) (*Disruption, error) {
disruption := createBaseDisruption(owner, disruptionSpec)

ownerNameLabel := getOwnerNameLabel(owner)
Expand All @@ -179,7 +179,7 @@ func CreateDisruptionFromTemplate(ctx context.Context, cl client.Client, scheme
}

// getScheduledTimeForDisruption returns the scheduled time for a particular disruption.
func getScheduledTimeForDisruption(log *zap.SugaredLogger, disruption *chaosv1beta1.Disruption) time.Time {
func getScheduledTimeForDisruption(log *zap.SugaredLogger, disruption *Disruption) time.Time {
parsedTime, err := disruption.GetScheduledAtAnnotation()
if err != nil {
log.Errorw("unable to parse schedule time for child disruption", "err", err, cLog.DisruptionNameKey, disruption.Name)
Expand All @@ -190,7 +190,7 @@ func getScheduledTimeForDisruption(log *zap.SugaredLogger, disruption *chaosv1be
}

// GetMostRecentScheduleTime returns the most recent scheduled time from a list of disruptions.
func GetMostRecentScheduleTime(log *zap.SugaredLogger, disruptions *chaosv1beta1.DisruptionList) time.Time {
func GetMostRecentScheduleTime(log *zap.SugaredLogger, disruptions *DisruptionList) time.Time {
length := len(disruptions.Items)
if length == 0 {
return time.Time{}
Expand All @@ -210,9 +210,9 @@ func GetMostRecentScheduleTime(log *zap.SugaredLogger, disruptions *chaosv1beta1
// It returns a formatted string name.
func generateDisruptionName(owner metav1.Object) string {
switch typedOwner := owner.(type) {
case *chaosv1beta1.DisruptionCron:
case *DisruptionCron:
return fmt.Sprintf("disruption-cron-%s", typedOwner.GetName())
case *chaosv1beta1.DisruptionRollout:
case *DisruptionRollout:
return fmt.Sprintf("disruption-rollout-%s", typedOwner.GetName())
}

Expand All @@ -223,9 +223,9 @@ func generateDisruptionName(owner metav1.Object) string {
// It returns the label string.
func getOwnerNameLabel(owner metav1.Object) string {
switch owner.(type) {
case *chaosv1beta1.DisruptionCron:
case *DisruptionCron:
return DisruptionCronNameLabel
case *chaosv1beta1.DisruptionRollout:
case *DisruptionRollout:
return DisruptionRolloutNameLabel
}

Expand Down
21 changes: 21 additions & 0 deletions api/v1beta1/disruption_cron_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package v1beta1

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -40,6 +41,7 @@ var (
disruptionCronPermittedUserGroups map[string]struct{}
disruptionCronPermittedUserGroupString string
disruptionCronMetricsSink metrics.Sink
requireDisruptionCronTarget bool
defaultCronDelayedStartTolerance time.Duration
minimumCronFrequency time.Duration
)
Expand All @@ -63,6 +65,7 @@ func (d *DisruptionCron) SetupWebhookWithManager(setupWebhookConfig utils.SetupW
defaultCronDelayedStartTolerance = setupWebhookConfig.DefaultCronDelayedStartTolerance
minimumCronFrequency = setupWebhookConfig.MinimumCronFrequency
defaultDuration = setupWebhookConfig.DefaultDurationFlag
requireDisruptionCronTarget = setupWebhookConfig.RequireDisruptionCronTarget

return ctrl.NewWebhookManagedBy(setupWebhookConfig.Manager).
For(d).
Expand Down Expand Up @@ -122,6 +125,24 @@ func (d *DisruptionCron) ValidateCreate() (_ admission.Warnings, err error) {
return nil, err
}

if requireDisruptionCronTarget {
var exists bool

// CheckTargetResourceExists doesn't return apierrors.NotFound. Which means if there is an error,
// we could not determine if the target existed, and should allow the Create.
if exists, err = CheckTargetResourceExists(context.Background(), k8sClient, &d.Spec.TargetResource, d.Namespace); err != nil {
log.Errorw("error checking if target resource exists", "error", err)
} else if !exists {
log.Warnw("rejecting disruption cron because target does not exist",
"targetName", d.Spec.TargetResource.Name,
"targetKind", d.Spec.TargetResource.Kind,
"error", err)

return nil, fmt.Errorf("rejecting disruption cron because target %s %s/%s does not exist",
d.Spec.TargetResource.Kind, d.Namespace, d.Spec.TargetResource.Name)
}
}

if mErr := metricsSink.MetricValidationCreated(metricTags); mErr != nil {
log.Errorw("error sending a metric", "error", mErr)
}
Expand Down
1 change: 1 addition & 0 deletions chart/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ data:
allowNodeLevel: {{ .Values.controller.safeMode.allowNodeLevel }}
allowNodeFailure: {{ .Values.controller.safeMode.allowNodeFailure }}
disruptionCronEnabled: {{ .Values.controller.disruptionCronEnabled }}
requireDisruptionCronTarget: {{ .Values.controller.requireDisruptionCronTarget | default false }}
disruptionRolloutEnabled: {{ .Values.controller.disruptionRolloutEnabled }}
disruptionDeletionTimeout: {{ .Values.controller.disruptionDeletionTimeout }}
disabledDisruptions:
Expand Down
1 change: 1 addition & 0 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ controller:
memory: 300Mi
ephemeralStorage: 1Gi
disruptionCronEnabled: true
requireDisruptionCronTarget: false # If set to true, disruption crons will be rejected on create if we cannot find their target
disruptionRolloutEnabled: false
disruptionDeletionTimeout: 15m # The duration after which a disruption will be marked as "stuck on removal" if its removal process exceeds this duration.
aggregateToClusterRole: false # If this is true two aggregated cluster roles are created for viewing and editing (https://kubernetes.io/docs/reference/access-authn-authz/rbac/#aggregated-clusterroles)
Expand Down
13 changes: 10 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (
"os"
"time"

cloudtypes "github.com/DataDog/chaos-controller/cloudservice/types"
"github.com/DataDog/chaos-controller/eventnotifier"

"github.com/cenkalti/backoff"
"github.com/fsnotify/fsnotify"
"github.com/spf13/pflag"
Expand All @@ -20,9 +23,6 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"

cloudtypes "github.com/DataDog/chaos-controller/cloudservice/types"
"github.com/DataDog/chaos-controller/eventnotifier"
)

type config struct {
Expand Down Expand Up @@ -52,6 +52,7 @@ type controllerConfig struct {
ProfilerSink string `json:"profilerSink" yaml:"profilerSink"`
TracerSink string `json:"tracerSink" yaml:"tracerSink"`
DisruptionCronEnabled bool `json:"disruptionCronEnabled" yaml:"disruptionCronEnabled"`
RequireDisruptionCronTarget bool `json:"requireDisruptionCronTarget" yaml:"requireDisruptionCronTarget"`
DisruptionRolloutEnabled bool `json:"disruptionRolloutEnabled" yaml:"disruptionRolloutEnabled"`
DisruptionDeletionTimeout time.Duration `json:"disruptionDeletionTimeout" yaml:"disruptionDeletionTimeout"`
FinalizerDeletionDelay time.Duration `json:"finalizerDeletionDelay" yaml:"finalizerDeletionDelay"`
Expand Down Expand Up @@ -552,6 +553,12 @@ func New(client corev1client.ConfigMapInterface, logger *zap.SugaredLogger, osAr
return cfg, err
}

mainFS.BoolVar(&cfg.Controller.RequireDisruptionCronTarget, "require-disruption-cron-target", false, "Reject disruption crons on create if their target cannot be found")

if err := viper.BindPFlag("controller.requireDisruptionCronTarget", mainFS.Lookup("require-disruption-cron-target")); err != nil {
return cfg, err
}

mainFS.BoolVar(&cfg.Controller.DisruptionRolloutEnabled, "disruption-rollout-enabled", false, "Enable the DisruptionRollout CRD and its controller")

if err := viper.BindPFlag("controller.disruptionRolloutEnabled", mainFS.Lookup("disruption-rollout-enabled")); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions controllers/disruption_cron_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (r *DisruptionCronReconciler) Reconcile(ctx context.Context, req ctrl.Reque
return ctrl.Result{}, nil
}

disruptions, err := GetChildDisruptions(ctx, r.Client, r.log, instance.Namespace, DisruptionCronNameLabel, instance.Name)
disruptions, err := chaosv1beta1.GetChildDisruptions(ctx, r.Client, r.log, instance.Namespace, chaosv1beta1.DisruptionCronNameLabel, instance.Name)
if err != nil {
return ctrl.Result{}, nil
}
Expand Down Expand Up @@ -179,7 +179,7 @@ func (r *DisruptionCronReconciler) Reconcile(ctx context.Context, req ctrl.Reque
r.log.Infow("processing current run", "currentRun", missedRun.Format(time.UnixDate))

// Create disruption for current run
disruption, err := CreateDisruptionFromTemplate(ctx, r.Client, r.Scheme, instance, &instance.Spec.TargetResource, &instance.Spec.DisruptionTemplate, missedRun, r.log)
disruption, err := chaosv1beta1.CreateDisruptionFromTemplate(ctx, r.Client, r.Scheme, instance, &instance.Spec.TargetResource, &instance.Spec.DisruptionTemplate, missedRun, r.log)

if err != nil {
r.log.Warnw("unable to construct disruption from template", "err", err)
Expand Down Expand Up @@ -230,7 +230,7 @@ func (r *DisruptionCronReconciler) Reconcile(ctx context.Context, req ctrl.Reque
// updateLastScheduleTime updates the LastScheduleTime in the status of a DisruptionCron instance
// based on the most recent schedule time among the given disruptions.
func (r *DisruptionCronReconciler) updateLastScheduleTime(ctx context.Context, instance *chaosv1beta1.DisruptionCron, disruptions *chaosv1beta1.DisruptionList) error {
mostRecentScheduleTime := GetMostRecentScheduleTime(r.log, disruptions) // find the last run so we can update the status
mostRecentScheduleTime := chaosv1beta1.GetMostRecentScheduleTime(r.log, disruptions) // find the last run so we can update the status
if !mostRecentScheduleTime.IsZero() {
instance.Status.LastScheduleTime = &metav1.Time{Time: mostRecentScheduleTime}
return r.Client.Status().Update(ctx, instance)
Expand All @@ -246,7 +246,7 @@ func (r *DisruptionCronReconciler) updateLastScheduleTime(ctx context.Context, i
// - error: Represents any error that occurred during the execution of the function.
func (r *DisruptionCronReconciler) updateTargetResourcePreviouslyMissing(ctx context.Context, instance *chaosv1beta1.DisruptionCron) (bool, bool, error) {
disruptionCronDeleted := false
targetResourceExists, err := CheckTargetResourceExists(ctx, r.Client, &instance.Spec.TargetResource, instance.Namespace)
targetResourceExists, err := chaosv1beta1.CheckTargetResourceExists(ctx, r.Client, &instance.Spec.TargetResource, instance.Namespace)

if err != nil {
return targetResourceExists, disruptionCronDeleted, err
Expand Down
8 changes: 4 additions & 4 deletions controllers/disruption_rollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (r *DisruptionRolloutReconciler) Reconcile(ctx context.Context, req ctrl.Re
return ctrl.Result{}, nil
}

disruptions, err := GetChildDisruptions(ctx, r.Client, r.log, instance.Namespace, DisruptionRolloutNameLabel, instance.Name)
disruptions, err := chaosv1beta1.GetChildDisruptions(ctx, r.Client, r.log, instance.Namespace, chaosv1beta1.DisruptionRolloutNameLabel, instance.Name)
if err != nil {
return ctrl.Result{}, nil
}
Expand Down Expand Up @@ -139,7 +139,7 @@ func (r *DisruptionRolloutReconciler) Reconcile(ctx context.Context, req ctrl.Re

// Create disruption
scheduledTime := time.Now()
disruption, err := CreateDisruptionFromTemplate(ctx, r.Client, r.Scheme, instance, &instance.Spec.TargetResource, &instance.Spec.DisruptionTemplate, scheduledTime, r.log)
disruption, err := chaosv1beta1.CreateDisruptionFromTemplate(ctx, r.Client, r.Scheme, instance, &instance.Spec.TargetResource, &instance.Spec.DisruptionTemplate, scheduledTime, r.log)

if err != nil {
r.log.Warnw("unable to construct disruption from template", "err", err)
Expand Down Expand Up @@ -174,7 +174,7 @@ func (r *DisruptionRolloutReconciler) Reconcile(ctx context.Context, req ctrl.Re
// updateLastScheduleTime updates the LastScheduleTime in the status of a DisruptionRollout instance
// based on the most recent schedule time among the given disruptions.
func (r *DisruptionRolloutReconciler) updateLastScheduleTime(ctx context.Context, instance *chaosv1beta1.DisruptionRollout, disruptions *chaosv1beta1.DisruptionList) error {
mostRecentScheduleTime := GetMostRecentScheduleTime(r.log, disruptions) // find the last run so we can update the status
mostRecentScheduleTime := chaosv1beta1.GetMostRecentScheduleTime(r.log, disruptions) // find the last run so we can update the status
if !mostRecentScheduleTime.IsZero() {
instance.Status.LastScheduleTime = &metav1.Time{Time: mostRecentScheduleTime}
return r.Client.Status().Update(ctx, instance)
Expand All @@ -190,7 +190,7 @@ func (r *DisruptionRolloutReconciler) updateLastScheduleTime(ctx context.Context
// - error: Represents any error that occurred during the execution of the function.
func (r *DisruptionRolloutReconciler) updateTargetResourcePreviouslyMissing(ctx context.Context, instance *chaosv1beta1.DisruptionRollout) (bool, bool, error) {
disruptionRolloutDeleted := false
targetResourceExists, err := CheckTargetResourceExists(ctx, r.Client, &instance.Spec.TargetResource, instance.Namespace)
targetResourceExists, err := chaosv1beta1.CheckTargetResourceExists(ctx, r.Client, &instance.Spec.TargetResource, instance.Namespace)

if err != nil {
return targetResourceExists, disruptionRolloutDeleted, err
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ func main() {
DefaultCronDelayedStartTolerance: cfg.Controller.DefaultCronDelayedStartTolerance,
MinimumCronFrequency: cfg.Controller.MinimumCronFrequency,
DefaultDurationFlag: cfg.Controller.DefaultDuration,
RequireDisruptionCronTarget: cfg.Controller.RequireDisruptionCronTarget,
MetricsSink: disruptionCronMetricsSink,
}

Expand Down
1 change: 1 addition & 0 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type SetupWebhookWithManagerConfig struct {
EnableSafemodeFlag bool
AllowNodeLevel bool
AllowNodeFailure bool
RequireDisruptionCronTarget bool
DisabledDisruptions []string
DeleteOnlyFlag bool
HandlerEnabledFlag bool
Expand Down