-
Notifications
You must be signed in to change notification settings - Fork 32
/
Copy pathdisruption_cron_controller.go
362 lines (283 loc) · 15.1 KB
/
disruption_cron_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2025 Datadog, Inc.
package controllers
import (
"context"
"fmt"
"time"
chaosv1beta1 "github.com/DataDog/chaos-controller/api/v1beta1"
cLog "github.com/DataDog/chaos-controller/log"
"github.com/DataDog/chaos-controller/o11y/metrics"
chaostypes "github.com/DataDog/chaos-controller/types"
"github.com/robfig/cron"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
var DisruptionCronTags = []string{}
type DisruptionCronReconciler struct {
Client client.Client
Scheme *runtime.Scheme
BaseLog *zap.SugaredLogger
log *zap.SugaredLogger
MetricsSink metrics.Sink
FinalizerDeletionDelay time.Duration
TargetResourceMissingThreshold time.Duration
}
func (r *DisruptionCronReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) {
r.log = r.BaseLog.With(cLog.DisruptionCronNamespaceKey, req.Namespace, cLog.DisruptionCronNameKey, req.Name)
r.log.Info("Reconciling DisruptionCron")
// reconcile metrics
r.handleMetricSinkError(r.MetricsSink.MetricReconcile())
instance := &chaosv1beta1.DisruptionCron{}
defer func(tsStart time.Time) {
tags := []string{}
if instance.Name != "" {
tags = append(tags, "disruptionCronName:"+instance.Name, "disruptionCronNamespace:"+instance.Namespace)
}
r.handleMetricSinkError(r.MetricsSink.MetricReconcileDuration(time.Since(tsStart), tags))
}(time.Now())
// Fetch DisruptionCron instance
if err := r.Client.Get(ctx, req.NamespacedName, instance); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
r.log.Infow("fetched last known history", "history", instance.Status.History)
DisruptionCronTags = []string{"disruptionCronName:" + instance.Name, "disruptionCronNamespace:" + instance.Namespace, "targetName:" + instance.Spec.TargetResource.Name}
// DisruptionCron being deleted
if !instance.DeletionTimestamp.IsZero() {
// the instance is being deleted, remove finalizer avec finalizerDeletionDelay
if controllerutil.ContainsFinalizer(instance, chaostypes.DisruptionCronFinalizer) {
if instance.IsReadyToRemoveFinalizer(r.FinalizerDeletionDelay) {
// we reach this code when all the cleanup pods have succeeded and we waited for finalizerDeletionDelay
// we can remove the finalizer and let the resource being garbage collected
r.log.Infow("removing disruptioncron finalizer")
controllerutil.RemoveFinalizer(instance, chaostypes.DisruptionCronFinalizer)
if err := r.Client.Update(ctx, instance); err != nil {
return ctrl.Result{}, fmt.Errorf("error removing disruptioncron finalizer: %w", err)
}
return ctrl.Result{}, nil
}
// waiting for finalizerDeletionDelay before removing finalizer
requeueAfter := r.FinalizerDeletionDelay
r.log.Infow(fmt.Sprintf("requeuing to remove finalizer after %s", requeueAfter))
return ctrl.Result{Requeue: true, RequeueAfter: requeueAfter}, nil
}
return ctrl.Result{}, nil
}
updated := controllerutil.AddFinalizer(instance, chaostypes.DisruptionCronFinalizer)
if updated {
if err := r.Client.Update(ctx, instance); err != nil {
return ctrl.Result{}, fmt.Errorf("error adding disruptioncron finalizer: %w", err)
}
}
// Update the DisruptionCron status based on the presence of the target resource
// If the target resource has been missing for longer than the TargetResourceMissingThreshold, delete the instance
targetResourceExists, instanceDeleted, err := r.updateTargetResourcePreviouslyMissing(ctx, instance)
if err != nil {
// Log error and requeue if status update or deletion fails
r.log.Errorw("failed to handle target resource status", "err", err)
return ctrl.Result{}, err
}
if instanceDeleted {
// Skip reconciliation since the instance has been deleted
return ctrl.Result{}, nil
}
disruptions, err := chaosv1beta1.GetChildDisruptions(ctx, r.Client, r.log, instance.Namespace, chaosv1beta1.DisruptionCronNameLabel, instance.Name)
if err != nil {
return ctrl.Result{}, nil
}
// Update the DisruptionCron status with the time when the last disruption was successfully scheduled
if err := r.updateLastScheduleTime(ctx, instance, disruptions); err != nil {
r.log.Errorw("unable to update LastScheduleTime of DisruptionCron status", "err", err)
return ctrl.Result{}, err
}
if instance.Spec.Paused {
r.handleMetricSinkError(r.MetricsSink.MetricPausedCron(DisruptionCronTags))
r.log.Debugw("disruptioncron has been paused, will not resume until spec.paused is false")
return ctrl.Result{}, nil
}
// Get next scheduled run or time of unprocessed run
// If multiple unmet start times exist, start the last one
missedRun, nextRun, err := r.getNextSchedule(instance, time.Now())
if err != nil {
r.log.Errorw("unable to figure out DisruptionCron schedule", "err", err)
// Don't requeue until schedule update is received
return ctrl.Result{}, nil
}
// Calculate next requeue time
scheduledResult := ctrl.Result{RequeueAfter: time.Until(nextRun)}
requeueTime := scheduledResult.RequeueAfter.Round(time.Second)
r.log.Infow("calculated next scheduled run", "nextRun", nextRun.Format(time.UnixDate), "now", time.Now().Format(time.UnixDate))
// Run a new disruption if the following conditions are met:
// 1. It's on schedule
// 2. The target resource is available
// 3. It's not blocked by another disruption already running
// 4. It's not past the deadline
if missedRun.IsZero() {
r.log.Infow(fmt.Sprintf("no missed runs detected, scheduling next check in %s", requeueTime))
return scheduledResult, nil
}
if !targetResourceExists {
r.log.Infow(fmt.Sprintf("target resource is missing, scheduling next check in %s", requeueTime))
return scheduledResult, nil
}
if len(disruptions.Items) > 0 {
r.log.Infow(fmt.Sprintf("cannot start a new disruption as a prior one is still running, scheduling next check in %s", requeueTime), "numActiveDisruptions", len(disruptions.Items))
return scheduledResult, nil
}
tooLate := false
if instance.Spec.DelayedStartTolerance.Duration() > 0 {
tooLate = missedRun.Add(instance.Spec.DelayedStartTolerance.Duration()).Before(time.Now())
}
if tooLate {
r.handleMetricSinkError(r.MetricsSink.MetricTooLate(DisruptionCronTags))
r.log.Infow(fmt.Sprintf("missed schedule to start a disruption at %s, scheduling next check in %s", missedRun, requeueTime))
return scheduledResult, nil
}
r.log.Infow("processing current run", "currentRun", missedRun.Format(time.UnixDate))
// Create disruption for current run
disruption, err := chaosv1beta1.CreateDisruptionFromTemplate(ctx, r.Client, r.Scheme, instance, &instance.Spec.TargetResource, &instance.Spec.DisruptionTemplate, missedRun, r.log)
if err != nil {
r.log.Warnw("unable to construct disruption from template", "err", err)
// Don't requeue until update to the spec is received
return scheduledResult, nil
}
if err := r.Client.Create(ctx, disruption); err != nil {
r.log.Warnw("unable to create Disruption for DisruptionCron", "disruption", disruption, "err", err)
return ctrl.Result{}, err
}
r.handleMetricSinkError(r.MetricsSink.MetricDisruptionScheduled(append(DisruptionCronTags, "disruptionName:"+disruption.Name)))
r.log.Infow("created Disruption for DisruptionCron run", cLog.DisruptionNameKey, disruption.Name)
// ------------------------------------------------------------------ //
// If this process restarts at this point (after posting a disruption, but
// before updating the status), we might try to start the disruption again
// the next time. To prevent this, we use the same disruption name for every
// execution, acting as a lock to prevent creating the disruption twice.
// Add the start time of the just initiated disruption to the status
instance.Status.LastScheduleTime = &metav1.Time{Time: missedRun}
// Add to history, then ensure only the last MaxHistoryLen items are kept
instance.Status.History = append(instance.Status.History, chaosv1beta1.DisruptionCronTrigger{
Name: instance.ObjectMeta.Name,
Kind: instance.TypeMeta.Kind,
CreatedAt: *instance.Status.LastScheduleTime,
})
if len(instance.Status.History) > chaosv1beta1.MaxHistoryLen {
instance.Status.History = instance.Status.History[len(instance.Status.History)-chaosv1beta1.MaxHistoryLen:]
}
r.log.Debugw("updating instance Status lastScheduleTime and history",
"lastScheduleTime", instance.Status.LastScheduleTime, "history", instance.Status.History)
if err := r.Client.Status().Update(ctx, instance); err != nil {
r.log.Warnw("unable to update LastScheduleTime of DisruptionCron status", "err", err)
return ctrl.Result{}, err
}
return scheduledResult, nil
}
// updateLastScheduleTime updates the LastScheduleTime in the status of a DisruptionCron instance
// based on the most recent schedule time among the given disruptions.
func (r *DisruptionCronReconciler) updateLastScheduleTime(ctx context.Context, instance *chaosv1beta1.DisruptionCron, disruptions *chaosv1beta1.DisruptionList) error {
mostRecentScheduleTime := chaosv1beta1.GetMostRecentScheduleTime(r.log, disruptions) // find the last run so we can update the status
if !mostRecentScheduleTime.IsZero() {
instance.Status.LastScheduleTime = &metav1.Time{Time: mostRecentScheduleTime}
return r.Client.Status().Update(ctx, instance)
}
return nil // No need to update if mostRecentScheduleTime is nil
}
// updateTargetResourcePreviouslyMissing updates the status when the target resource was previously missing.
// The function returns three values:
// - bool: Indicates whether the target resource is currently found.
// - bool: Indicates whether the disruptioncron was deleted due to the target resource being missing for more than the expiration duration.
// - error: Represents any error that occurred during the execution of the function.
func (r *DisruptionCronReconciler) updateTargetResourcePreviouslyMissing(ctx context.Context, instance *chaosv1beta1.DisruptionCron) (bool, bool, error) {
disruptionCronDeleted := false
targetResourceExists, err := chaosv1beta1.CheckTargetResourceExists(ctx, r.Client, &instance.Spec.TargetResource, instance.Namespace)
if err != nil {
return targetResourceExists, disruptionCronDeleted, err
}
if !targetResourceExists {
r.log.Warnw("target does not exist, this schedule will be deleted if that continues", "error", err)
if instance.Status.TargetResourcePreviouslyMissing == nil {
r.log.Warnw("target is missing for the first time, updating status")
return targetResourceExists, disruptionCronDeleted, r.handleTargetResourceFirstMissing(ctx, instance)
}
if time.Since(instance.Status.TargetResourcePreviouslyMissing.Time) > r.TargetResourceMissingThreshold {
r.log.Warnw("target has been missing for over one day, deleting this schedule",
"timeMissing", time.Since(instance.Status.TargetResourcePreviouslyMissing.Time))
disruptionCronDeleted = true
return targetResourceExists, disruptionCronDeleted, r.handleTargetResourceMissingPastExpiration(ctx, instance)
}
r.handleMetricSinkError(r.MetricsSink.MetricTargetMissing(time.Since(instance.Status.TargetResourcePreviouslyMissing.Time), DisruptionCronTags))
} else if instance.Status.TargetResourcePreviouslyMissing != nil {
r.log.Infow("target was previously missing, but now present. updating the status accordingly")
r.handleMetricSinkError(r.MetricsSink.MetricMissingTargetFound(DisruptionCronTags))
return targetResourceExists, disruptionCronDeleted, r.handleTargetResourceNowPresent(ctx, instance)
}
return targetResourceExists, disruptionCronDeleted, nil
}
// handleTargetResourceFirstMissing handles the scenario when the target resource is missing for the first time.
// It updates the status of the DisruptionCron instance.
func (r *DisruptionCronReconciler) handleTargetResourceFirstMissing(ctx context.Context, instance *chaosv1beta1.DisruptionCron) error {
instance.Status.TargetResourcePreviouslyMissing = &metav1.Time{Time: time.Now()}
if err := r.Client.Status().Update(ctx, instance); err != nil {
return fmt.Errorf("failed to update status: %w", err)
}
return nil
}
// handleTargetResourceMissingPastExpiration handles the scenario when the target resource has been missing for more than the expiration period.
// It deletes the DisruptionCron instance.
func (r *DisruptionCronReconciler) handleTargetResourceMissingPastExpiration(ctx context.Context, instance *chaosv1beta1.DisruptionCron) error {
if err := r.Client.Delete(ctx, instance); err != nil {
return fmt.Errorf("failed to delete instance: %w", err)
}
r.handleMetricSinkError(r.MetricsSink.MetricMissingTargetDeleted(DisruptionCronTags))
return nil
}
// handleTargetResourceNowPresent handles the scenario when the target resource was previously missing but is now present.
// It updates the status of the DisruptionCron instance.
func (r *DisruptionCronReconciler) handleTargetResourceNowPresent(ctx context.Context, instance *chaosv1beta1.DisruptionCron) error {
instance.Status.TargetResourcePreviouslyMissing = nil
if err := r.Client.Status().Update(ctx, instance); err != nil {
return fmt.Errorf("failed to update status: %w", err)
}
return nil
}
// getNextSchedule calculates the next scheduled time for a DisruptionCron instance based on its cron schedule and the current time.
// It returns the last missed schedule time, the next scheduled time, and any error encountered during parsing the schedule.
func (r *DisruptionCronReconciler) getNextSchedule(instance *chaosv1beta1.DisruptionCron, now time.Time) (lastMissed time.Time, next time.Time, err error) {
sched, err := cron.ParseStandard(instance.Spec.Schedule)
if err != nil {
r.log.Errorw("Unparseable schedule", "schedule", instance.Spec.Schedule, "err", err)
return time.Time{}, time.Time{}, err
}
var earliestTime time.Time
if instance.Status.LastScheduleTime != nil {
earliestTime = instance.Status.LastScheduleTime.Time
} else {
earliestTime = instance.ObjectMeta.CreationTimestamp.Time
}
if earliestTime.After(now) {
r.log.Warnw("getNextSchedule has found itself in the past", "earliestTime", earliestTime.GoString(), "now", now.GoString())
return time.Time{}, sched.Next(now), nil
}
for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) {
lastMissed = t
}
r.handleMetricSinkError(r.MetricsSink.MetricNextScheduledTime(time.Until(sched.Next(now)), DisruptionCronTags))
return lastMissed, sched.Next(now), nil
}
// SetupWithManager setups the current reconciler with the given manager
func (r *DisruptionCronReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&chaosv1beta1.DisruptionCron{}).
Complete(r)
}
// handleMetricSinkError logs the given metric sink error if it is not nil
func (r *DisruptionCronReconciler) handleMetricSinkError(err error) {
if err != nil {
r.log.Errorw("error sending a metric", "error", err)
}
}