Skip to content

Commit 93adf87

Browse files
authored
Merge pull request #12993 from sbueringer/pr-improve-wait-for-cache
🌱 Improve wait for cache
2 parents aa484c5 + 157403f commit 93adf87

File tree

6 files changed

+167
-28
lines changed

6 files changed

+167
-28
lines changed

internal/controllers/machinedeployment/machinedeployment_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ func (r *Reconciler) createOrUpdateMachineSetsAndSyncMachineDeploymentRevision(c
354354
// Keep trying to get the MachineSet. This will force the cache to update and prevent any future reconciliation of
355355
// the MachineDeployment to reconcile with an outdated list of MachineSets which could lead to unwanted creation of
356356
// a duplicate MachineSet.
357-
if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "MachineSet creation", ms); err != nil {
357+
if err := clientutil.WaitForObjectsToBeAddedToTheCache(ctx, r.Client, "MachineSet creation", ms); err != nil {
358358
return err
359359
}
360360

internal/controllers/machinedeployment/machinedeployment_sync.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232

3333
clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2"
3434
"sigs.k8s.io/cluster-api/internal/controllers/machinedeployment/mdutil"
35+
clientutil "sigs.k8s.io/cluster-api/internal/util/client"
3536
"sigs.k8s.io/cluster-api/util/collections"
3637
v1beta1conditions "sigs.k8s.io/cluster-api/util/conditions/deprecated/v1beta1"
3738
"sigs.k8s.io/cluster-api/util/patch"
@@ -334,6 +335,7 @@ func (r *Reconciler) cleanupDeployment(ctx context.Context, oldMSs []*clusterv1.
334335

335336
sort.Sort(mdutil.MachineSetsByCreationTimestamp(cleanableMSes))
336337

338+
machineSetsDeleted := []*clusterv1.MachineSet{}
337339
for i := range cleanableMSCount {
338340
ms := cleanableMSes[i]
339341
if ms.Spec.Replicas == nil {
@@ -351,10 +353,12 @@ func (r *Reconciler) cleanupDeployment(ctx context.Context, oldMSs []*clusterv1.
351353
r.recorder.Eventf(deployment, corev1.EventTypeWarning, "FailedDelete", "Failed to delete MachineSet %q: %v", ms.Name, err)
352354
return errors.Wrapf(err, "failed to delete MachineSet %s (cleanup of old MachineSets)", klog.KObj(ms))
353355
}
356+
machineSetsDeleted = append(machineSetsDeleted, ms)
357+
354358
// Note: We intentionally log after Delete because we want this log line to show up only after DeletionTimestamp has been set.
355359
log.Info(fmt.Sprintf("MachineSet %s deleting (cleanup of old MachineSets)", ms.Name), "MachineSet", klog.KObj(ms))
356360
r.recorder.Eventf(deployment, corev1.EventTypeNormal, "SuccessfulDelete", "Deleted MachineSet %q", ms.Name)
357361
}
358362

359-
return nil
363+
return clientutil.WaitForObjectsToBeDeletedFromTheCache(ctx, r.Client, "MachineSet deletion (cleanup of old MachineSet)", machineSetsDeleted...)
360364
}

internal/controllers/machineset/machineset_controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -896,7 +896,7 @@ func (r *Reconciler) createMachines(ctx context.Context, s *scope, machinesToAdd
896896
}
897897

898898
// Wait for cache update to ensure following reconcile gets latest change.
899-
return ctrl.Result{}, clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "Machine creation", machinesAdded...)
899+
return ctrl.Result{}, clientutil.WaitForObjectsToBeAddedToTheCache(ctx, r.Client, "Machine creation", machinesAdded...)
900900
}
901901

902902
func (r *Reconciler) deleteMachines(ctx context.Context, s *scope, machinesToDelete int) (ctrl.Result, error) {
@@ -944,7 +944,7 @@ func (r *Reconciler) deleteMachines(ctx context.Context, s *scope, machinesToDel
944944
}
945945

946946
// Wait for cache update to ensure following reconcile gets latest change.
947-
if err := clientutil.WaitForObjectsToBeDeletedFromTheCache(ctx, r.Client, "Machine deletion", machinesDeleted...); err != nil {
947+
if err := clientutil.WaitForObjectsToBeDeletedFromTheCache(ctx, r.Client, "Machine deletion (scale down)", machinesDeleted...); err != nil {
948948
errs = append(errs, err)
949949
}
950950
if len(errs) > 0 {

internal/controllers/topology/cluster/reconcile_state.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -693,7 +693,7 @@ func (r *Reconciler) createMachineDeployment(ctx context.Context, s *scope.Scope
693693
// Wait until MachineDeployment is visible in the cache.
694694
// Note: We have to do this because otherwise using a cached client in current state could
695695
// miss a newly created MachineDeployment (because the cache might be stale).
696-
if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "MachineDeployment creation", md.Object); err != nil {
696+
if err := clientutil.WaitForObjectsToBeAddedToTheCache(ctx, r.Client, "MachineDeployment creation", md.Object); err != nil {
697697
return err
698698
}
699699

@@ -1016,7 +1016,7 @@ func (r *Reconciler) createMachinePool(ctx context.Context, s *scope.Scope, mp *
10161016
// Wait until MachinePool is visible in the cache.
10171017
// Note: We have to do this because otherwise using a cached client in current state could
10181018
// miss a newly created MachinePool (because the cache might be stale).
1019-
return clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "MachinePool creation", mp.Object)
1019+
return clientutil.WaitForObjectsToBeAddedToTheCache(ctx, r.Client, "MachinePool creation", mp.Object)
10201020
}
10211021

10221022
// updateMachinePool updates a MachinePool. Also updates the corresponding objects if necessary.

internal/util/client/client.go

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,20 @@ var (
4949
// all passed in objects with at least the passed in resourceVersion.
5050
// This is done by retrieving objects from the cache via the client and then comparing resourceVersions.
5151
// Note: This func will update the passed in objects while polling.
52+
// Note: resourceVersion must be set on the passed in objects.
5253
// Note: The generic parameter enforces that all objects have the same type.
5354
func WaitForCacheToBeUpToDate[T client.Object](ctx context.Context, c client.Client, action string, objs ...T) error {
5455
return waitFor(ctx, c, action, checkIfObjectUpToDate, objs...)
5556
}
5657

58+
// WaitForObjectsToBeAddedToTheCache waits until the cache is up-to-date in the sense of that the
59+
// passed in objects exist in the cache.
60+
// Note: This func will update the passed in objects while polling.
61+
// Note: The generic parameter enforces that all objects have the same type.
62+
func WaitForObjectsToBeAddedToTheCache[T client.Object](ctx context.Context, c client.Client, action string, objs ...T) error {
63+
return waitFor(ctx, c, action, checkIfObjectAdded, objs...)
64+
}
65+
5766
// WaitForObjectsToBeDeletedFromTheCache waits until the cache is up-to-date in the sense of that the
5867
// passed in objects have been either removed from the cache or they have a deletionTimestamp set.
5968
// Note: This func will update the passed in objects while polling.
@@ -64,22 +73,21 @@ func WaitForObjectsToBeDeletedFromTheCache[T client.Object](ctx context.Context,
6473

6574
// checkIfObjectUpToDate checks if an object is up-to-date and returns an error if it is not.
6675
func checkIfObjectUpToDate(ctx context.Context, c client.Client, desiredObj desiredObject) (isErrorRetryable bool, err error) {
76+
if desiredObj.MinimumResourceVersion == "" {
77+
// Unexpected error occurred: resourceVersion not set on passed in object (not retryable).
78+
return false, errors.Errorf("%s: cannot compare with invalid resourceVersion: resourceVersion not set",
79+
klog.KObj(desiredObj.Object))
80+
}
81+
6782
if err := c.Get(ctx, desiredObj.Key, desiredObj.Object); err != nil {
6883
if apierrors.IsNotFound(err) {
69-
// Object is not yet in the cache (retryable).
70-
return true, err
84+
// Done, object was deleted in the meantime.
85+
return false, nil
7186
}
7287
// Unexpected error occurred (not retryable).
7388
return false, err
7489
}
7590

76-
if desiredObj.MinimumResourceVersion == "" {
77-
// Done, if MinimumResourceVersion is empty, as it is enough if the object exists in the cache.
78-
// Note: This can happen when the ServerSidePatchHelper is used to create an object as the ServerSidePatchHelper
79-
// does not update the object after Apply and accordingly resourceVersion remains empty.
80-
return false, nil
81-
}
82-
8391
cmp, err := compareResourceVersion(desiredObj.Object.GetResourceVersion(), desiredObj.MinimumResourceVersion)
8492
if err != nil {
8593
// Unexpected error occurred: invalid resourceVersion (not retryable).
@@ -96,6 +104,20 @@ func checkIfObjectUpToDate(ctx context.Context, c client.Client, desiredObj desi
96104
return false, nil
97105
}
98106

107+
func checkIfObjectAdded(ctx context.Context, c client.Client, desiredObj desiredObject) (isErrorRetryable bool, err error) {
108+
if err := c.Get(ctx, desiredObj.Key, desiredObj.Object); err != nil {
109+
if apierrors.IsNotFound(err) {
110+
// Object is not yet in the cache (retryable).
111+
return true, err
112+
}
113+
// Unexpected error occurred (not retryable).
114+
return false, err
115+
}
116+
117+
// Done, object exists in the cache.
118+
return false, nil
119+
}
120+
99121
func checkIfObjectDeleted(ctx context.Context, c client.Client, desiredObj desiredObject) (isErrorRetryable bool, err error) {
100122
if err := c.Get(ctx, desiredObj.Key, desiredObj.Object); err != nil {
101123
if apierrors.IsNotFound(err) {

internal/util/client/client_test.go

Lines changed: 126 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,23 @@ func Test_WaitForCacheToBeUpToDate(t *testing.T) {
5656
{
5757
name: "no-op if no objects are passed in",
5858
},
59+
{
60+
name: "error if passed in objects have no resourceVersion set",
61+
objs: []client.Object{
62+
machine("machine-1", "", nil),
63+
machine("machine-2", "", nil),
64+
},
65+
clientResponses: map[client.ObjectKey][]client.Object{
66+
{Namespace: metav1.NamespaceDefault, Name: "machine-1"}: {
67+
machine("machine-1", "1", nil),
68+
},
69+
{Namespace: metav1.NamespaceDefault, Name: "machine-2"}: {
70+
machine("machine-2", "2", nil),
71+
},
72+
},
73+
wantErr: "failed to wait for up-to-date Machine objects in the cache after Machine update: " +
74+
"default/machine-1: cannot compare with invalid resourceVersion: resourceVersion not set",
75+
},
5976
{
6077
name: "error if passed in objects have invalid resourceVersion",
6178
objs: []client.Object{
@@ -70,7 +87,7 @@ func Test_WaitForCacheToBeUpToDate(t *testing.T) {
7087
machine("machine-2", "2", nil),
7188
},
7289
},
73-
wantErr: "failed to wait for up-to-date Machine objects in the cache after Machine creation: " +
90+
wantErr: "failed to wait for up-to-date Machine objects in the cache after Machine update: " +
7491
"default/machine-1: cannot compare with invalid resourceVersion: current: 1, expected to be >= invalidResourceVersion: resource version is not well formed: invalidResourceVersion",
7592
},
7693
{
@@ -87,35 +104,30 @@ func Test_WaitForCacheToBeUpToDate(t *testing.T) {
87104
machine("machine-2", "invalidResourceVersion", nil),
88105
},
89106
},
90-
wantErr: "failed to wait for up-to-date Machine objects in the cache after Machine creation: " +
107+
wantErr: "failed to wait for up-to-date Machine objects in the cache after Machine update: " +
91108
"default/machine-1: cannot compare with invalid resourceVersion: current: invalidResourceVersion, expected to be >= 1: resource version is not well formed: invalidResourceVersion",
92109
},
93110
{
94-
name: "error if objects never show up in the cache",
111+
name: "success if objects are never visible in the cache (deleted before WaitForCacheToBeUpToDate is called)",
95112
objs: []client.Object{
96113
machine("machine-1", "1", nil),
97114
machine("machine-2", "2", nil),
98115
machine("machine-3", "3", nil),
99116
machine("machine-4", "4", nil),
100117
},
101118
clientResponses: map[client.ObjectKey][]client.Object{},
102-
wantErr: "failed to wait for up-to-date Machine objects in the cache after Machine creation: timed out: [" +
103-
"machines.cluster.x-k8s.io \"machine-1\" not found, " +
104-
"machines.cluster.x-k8s.io \"machine-2\" not found, " +
105-
"machines.cluster.x-k8s.io \"machine-3\" not found, " +
106-
"machines.cluster.x-k8s.io \"machine-4\" not found]",
107119
},
108120
{
109121
name: "success if objects are instantly up-to-date",
110122
objs: []client.Object{
111-
machine("machine-1", "", nil),
123+
machine("machine-1", "1", nil),
112124
machine("machine-2", "2", nil),
113125
machine("machine-3", "3", nil),
114126
machine("machine-4", "4", nil),
115127
},
116128
clientResponses: map[client.ObjectKey][]client.Object{
117129
{Namespace: metav1.NamespaceDefault, Name: "machine-1"}: {
118-
// For this object it's enough if it shows up, exact resourceVersion doesn't matter.
130+
// This object has an even newer resourceVersion.
119131
machine("machine-1", "5", nil),
120132
},
121133
{Namespace: metav1.NamespaceDefault, Name: "machine-2"}: {
@@ -133,14 +145,13 @@ func Test_WaitForCacheToBeUpToDate(t *testing.T) {
133145
{
134146
name: "success if objects are up-to-date after a few tries",
135147
objs: []client.Object{
136-
machine("machine-1", "", nil),
148+
machine("machine-1", "1", nil),
137149
machine("machine-2", "10", nil),
138150
machine("machine-3", "11", nil),
139151
machine("machine-4", "12", nil),
140152
},
141153
clientResponses: map[client.ObjectKey][]client.Object{
142154
{Namespace: metav1.NamespaceDefault, Name: "machine-1"}: {
143-
// For this object it's enough if it shows up, exact resourceVersion doesn't matter.
144155
machine("machine-1", "4", nil),
145156
},
146157
{Namespace: metav1.NamespaceDefault, Name: "machine-2"}: {
@@ -195,7 +206,109 @@ func Test_WaitForCacheToBeUpToDate(t *testing.T) {
195206
},
196207
})
197208

198-
err := WaitForCacheToBeUpToDate(t.Context(), fakeClient, "Machine creation", tt.objs...)
209+
err := WaitForCacheToBeUpToDate(t.Context(), fakeClient, "Machine update", tt.objs...)
210+
if tt.wantErr != "" {
211+
g.Expect(err).To(HaveOccurred())
212+
g.Expect(err.Error()).To(Equal(tt.wantErr))
213+
} else {
214+
g.Expect(err).ToNot(HaveOccurred())
215+
}
216+
})
217+
}
218+
}
219+
220+
func Test_WaitForObjectsToBeAddedToTheCache(t *testing.T) {
221+
// Modify timeout to speed up test
222+
waitBackoff = wait.Backoff{
223+
Duration: 25 * time.Microsecond,
224+
Cap: 2 * time.Second,
225+
Factor: 1.2,
226+
Steps: 5,
227+
}
228+
229+
tests := []struct {
230+
name string
231+
objs []client.Object
232+
clientResponses map[client.ObjectKey][]client.Object
233+
wantErr string
234+
}{
235+
{
236+
name: "no-op if no objects are passed in",
237+
},
238+
{
239+
name: "error if objects never show up in the cache",
240+
objs: []client.Object{
241+
machine("machine-1", "1", nil),
242+
machine("machine-2", "2", nil),
243+
machine("machine-3", "3", nil),
244+
machine("machine-4", "4", nil),
245+
},
246+
clientResponses: map[client.ObjectKey][]client.Object{},
247+
wantErr: "failed to wait for up-to-date Machine objects in the cache after Machine creation: timed out: [" +
248+
"machines.cluster.x-k8s.io \"machine-1\" not found, " +
249+
"machines.cluster.x-k8s.io \"machine-2\" not found, " +
250+
"machines.cluster.x-k8s.io \"machine-3\" not found, " +
251+
"machines.cluster.x-k8s.io \"machine-4\" not found]",
252+
},
253+
{
254+
name: "success if objects instantly show up in the cache",
255+
objs: []client.Object{
256+
machine("machine-1", "1", nil),
257+
machine("machine-2", "2", nil),
258+
machine("machine-3", "3", nil),
259+
machine("machine-4", "4", nil),
260+
},
261+
clientResponses: map[client.ObjectKey][]client.Object{
262+
{Namespace: metav1.NamespaceDefault, Name: "machine-1"}: {
263+
// This object has an even newer resourceVersion.
264+
machine("machine-1", "5", nil),
265+
},
266+
{Namespace: metav1.NamespaceDefault, Name: "machine-2"}: {
267+
machine("machine-2", "2", nil),
268+
},
269+
{Namespace: metav1.NamespaceDefault, Name: "machine-3"}: {
270+
machine("machine-3", "3", nil),
271+
},
272+
{Namespace: metav1.NamespaceDefault, Name: "machine-4"}: {
273+
// This object has an even newer resourceVersion.
274+
machine("machine-4", "6", nil),
275+
},
276+
},
277+
},
278+
}
279+
280+
for _, tt := range tests {
281+
t.Run(tt.name, func(t *testing.T) {
282+
g := NewWithT(t)
283+
284+
scheme := runtime.NewScheme()
285+
_ = clusterv1.AddToScheme(scheme)
286+
287+
callCounter := map[client.ObjectKey]int{}
288+
fakeClient := interceptor.NewClient(fake.NewClientBuilder().WithScheme(scheme).Build(), interceptor.Funcs{
289+
Get: func(ctx context.Context, _ client.WithWatch, key client.ObjectKey, obj client.Object, _ ...client.GetOption) error {
290+
if len(tt.clientResponses) == 0 || len(tt.clientResponses[key]) == 0 {
291+
return apierrors.NewNotFound(schema.GroupResource{
292+
Group: clusterv1.GroupVersion.Group,
293+
Resource: "machines",
294+
}, key.Name)
295+
}
296+
297+
currentCall := callCounter[key]
298+
currentCall = min(currentCall, len(tt.clientResponses[key])-1)
299+
300+
// Write back the modified object so callers can access the patched object.
301+
if err := scheme.Convert(tt.clientResponses[key][currentCall], obj, ctx); err != nil {
302+
return errors.Wrapf(err, "unexpected error: failed to get")
303+
}
304+
305+
callCounter[key]++
306+
307+
return nil
308+
},
309+
})
310+
311+
err := WaitForObjectsToBeAddedToTheCache(t.Context(), fakeClient, "Machine creation", tt.objs...)
199312
if tt.wantErr != "" {
200313
g.Expect(err).To(HaveOccurred())
201314
g.Expect(err.Error()).To(Equal(tt.wantErr))

0 commit comments

Comments
 (0)