From a8ed3745634e2f4dbfab18a51e9f3c31ffbaf6e2 Mon Sep 17 00:00:00 2001 From: michaelawyu Date: Mon, 9 Dec 2024 23:50:53 +0800 Subject: [PATCH] drift detection and takeover implementation #1 --- apis/cluster/v1beta1/zz_generated.deepcopy.go | 2 +- .../v1beta1/zz_generated.deepcopy.go | 2 +- apis/v1alpha1/zz_generated.deepcopy.go | 2 +- pkg/controllers/workapplier/controller.go | 671 ++++++++++++++++++ .../workapplier/controller_test.go | 345 +++++++++ pkg/controllers/workapplier/utils.go | 245 +++++++ pkg/controllers/workapplier/utils_test.go | 504 +++++++++++++ pkg/scheduler/framework/framework.go | 2 +- pkg/scheduler/framework/framework_test.go | 2 +- .../defaulter/clusterresourceplacement.go | 27 +- .../clusterresourceplacement_test.go | 20 +- pkg/utils/defaulter/work.go | 11 +- pkg/utils/defaulter/work_test.go | 27 +- .../parallelizer/errorflag.go | 0 .../parallelizer/errorflag_test.go | 0 .../parallelizer/parallelizer.go | 0 .../parallelizer/parallelizer_test.go | 0 test/apis/v1alpha1/zz_generated.deepcopy.go | 2 +- 18 files changed, 1833 insertions(+), 29 deletions(-) create mode 100644 pkg/controllers/workapplier/controller.go create mode 100644 pkg/controllers/workapplier/controller_test.go create mode 100644 pkg/controllers/workapplier/utils.go create mode 100644 pkg/controllers/workapplier/utils_test.go rename pkg/{scheduler/framework => utils}/parallelizer/errorflag.go (100%) rename pkg/{scheduler/framework => utils}/parallelizer/errorflag_test.go (100%) rename pkg/{scheduler/framework => utils}/parallelizer/parallelizer.go (100%) rename pkg/{scheduler/framework => utils}/parallelizer/parallelizer_test.go (100%) diff --git a/apis/cluster/v1beta1/zz_generated.deepcopy.go b/apis/cluster/v1beta1/zz_generated.deepcopy.go index 6d06cb15b..6c25f0189 100644 --- a/apis/cluster/v1beta1/zz_generated.deepcopy.go +++ b/apis/cluster/v1beta1/zz_generated.deepcopy.go @@ -10,7 +10,7 @@ Licensed under the MIT license. package v1beta1 import ( - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" ) diff --git a/apis/placement/v1beta1/zz_generated.deepcopy.go b/apis/placement/v1beta1/zz_generated.deepcopy.go index 803ac0287..497454532 100644 --- a/apis/placement/v1beta1/zz_generated.deepcopy.go +++ b/apis/placement/v1beta1/zz_generated.deepcopy.go @@ -10,7 +10,7 @@ Licensed under the MIT license. package v1beta1 import ( - "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" ) diff --git a/apis/v1alpha1/zz_generated.deepcopy.go b/apis/v1alpha1/zz_generated.deepcopy.go index 0d4061551..ac4844274 100644 --- a/apis/v1alpha1/zz_generated.deepcopy.go +++ b/apis/v1alpha1/zz_generated.deepcopy.go @@ -11,7 +11,7 @@ package v1alpha1 import ( corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" ) diff --git a/pkg/controllers/workapplier/controller.go b/pkg/controllers/workapplier/controller.go new file mode 100644 index 000000000..3442faad4 --- /dev/null +++ b/pkg/controllers/workapplier/controller.go @@ -0,0 +1,671 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package workapplier + +import ( + "context" + "fmt" + "time" + + "go.uber.org/atomic" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + ctrloption "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" + "go.goms.io/fleet/pkg/utils/controller" + "go.goms.io/fleet/pkg/utils/defaulter" + "go.goms.io/fleet/pkg/utils/parallelizer" +) + +const ( + minRequestAfterDuration = time.Second * 5 +) + +// Reconciler reconciles a Work object. +type Reconciler struct { + hubClient client.Client + workNameSpace string + spokeDynamicClient dynamic.Interface + spokeClient client.Client + restMapper meta.RESTMapper + recorder record.EventRecorder + concurrentReconciles int + joined *atomic.Bool + parallelizer *parallelizer.Parallerlizer + + availabilityCheckRequeueAfter time.Duration + driftCheckRequeueAfter time.Duration +} + +// NewReconciler creates a new reconciler. +func NewReconciler( + hubClient client.Client, workNameSpace string, + spokeDynamicClient dynamic.Interface, spokeClient client.Client, restMapper meta.RESTMapper, + recorder record.EventRecorder, + concurrentReconciles int, + workerCount int, + availabilityCheckRequestAfter time.Duration, + driftCheckRequestAfter time.Duration, +) *Reconciler { + acRequestAfter := availabilityCheckRequestAfter + if acRequestAfter < minRequestAfterDuration { + klog.V(2).InfoS("Availability check requeue after duration is too short; set to the longer default", "availabilityCheckRequestAfter", acRequestAfter) + acRequestAfter = minRequestAfterDuration + } + + dcRequestAfter := driftCheckRequestAfter + if dcRequestAfter < minRequestAfterDuration { + klog.V(2).InfoS("Drift check requeue after duration is too short; set to the longer default", "driftCheckRequestAfter", dcRequestAfter) + dcRequestAfter = minRequestAfterDuration + } + + return &Reconciler{ + hubClient: hubClient, + spokeDynamicClient: spokeDynamicClient, + spokeClient: spokeClient, + restMapper: restMapper, + recorder: recorder, + concurrentReconciles: concurrentReconciles, + parallelizer: parallelizer.NewParallelizer(workerCount), + workNameSpace: workNameSpace, + joined: atomic.NewBool(false), + availabilityCheckRequeueAfter: acRequestAfter, + driftCheckRequeueAfter: dcRequestAfter, + } +} + +type manifestProcessingAppliedResultType string + +const ( + // The result types and descriptions for processing failures. + ManifestProcessingApplyResultTypeDecodingErred manifestProcessingAppliedResultType = "DecodingErred" + ManifestProcessingApplyResultTypeFoundGenerateName manifestProcessingAppliedResultType = "FoundGenerateName" + ManifestProcessingApplyResultTypeDuplicated manifestProcessingAppliedResultType = "Duplicated" + ManifestProcessingApplyResultTypeFailedToApply manifestProcessingAppliedResultType = "FailedToApply" + + // The result type and description for successful processing attempts. + ManifestProcessingApplyResultTypeApplied manifestProcessingAppliedResultType = "Applied" + + ManifestProcessingApplyResultTypeAppliedDescription = "Manifest has been applied successfully" +) + +type manifestProcessingBundle struct { + manifest *fleetv1beta1.Manifest + id *fleetv1beta1.WorkResourceIdentifier + manifestObj *unstructured.Unstructured + gvr *schema.GroupVersionResource + applyResTyp manifestProcessingAppliedResultType + applyErr error + firstDriftedTimestamp *metav1.Time + firstDiffedTimestamp *metav1.Time +} + +// Reconcile implement the control loop logic for Work object. +func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + if !r.joined.Load() { + klog.V(2).InfoS("Work applier has not started yet", "work", req.NamespacedName) + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + startTime := time.Now() + klog.V(2).InfoS("Work applier reconciliation starts", "work", req.NamespacedName) + defer func() { + latency := time.Since(startTime).Milliseconds() + klog.V(2).InfoS("Work applier reconciliation ends", "work", req.NamespacedName, "latency", latency) + }() + + // Retrieve the Work object. + work := &fleetv1beta1.Work{} + err := r.hubClient.Get(ctx, req.NamespacedName, work) + switch { + case apierrors.IsNotFound(err): + klog.V(2).InfoS("Work object has been deleted", "work", req.NamespacedName) + return ctrl.Result{}, nil + case err != nil: + klog.ErrorS(err, "Failed to retrieve the work", "work", req.NamespacedName) + return ctrl.Result{}, controller.NewAPIServerError(true, err) + } + + workRef := klog.KObj(work) + + // Garbage collect the AppliedWork object if the Work object has been deleted. + if !work.DeletionTimestamp.IsZero() { + klog.V(2).InfoS("Work object has been marked for deletion; start garbage collection", work.Kind, workRef) + return r.garbageCollectAppliedWork(ctx, work) + } + + // ensure that the appliedWork and the finalizer exist + appliedWork, err := r.ensureAppliedWork(ctx, work) + if err != nil { + return ctrl.Result{}, err + } + expectedAppliedWorkOwnerRef := &metav1.OwnerReference{ + APIVersion: fleetv1beta1.GroupVersion.String(), + Kind: fleetv1beta1.AppliedWorkKind, + Name: appliedWork.GetName(), + UID: appliedWork.GetUID(), + BlockOwnerDeletion: ptr.To(false), + } + + // Set the default values for the Work object to avoid additional validation logic in the + // later steps. + defaulter.SetDefaultsWork(work) + + // Note (chenyu1): as of Nov 8, 2024, Fleet has a bug which would assign an identifier with empty + // name to an object with generated name; since in earlier versions the identifier struct + // itself does not bookkeep generate name information, this would effectively lead to the loss + // of track of such objects, which would lead to repeatedly creating the same resource and/or + // apply failures in the work applier controller. + // + // In the current version, for simplicity reasons, Fleet has dropped support for objects with + // generate names; any attempt to place such objects will yield an apply error. The code + // has been updated to automatically ignore identifiers with empty names so that reconciliation + // can resume in a previously erred setup. + // + // TO-DO (chenyu1): evaluate if it is necessary to add support for objects with generate + // names. + + // Prepare the bundles. + bundles := prepareManifestProcessingBundles(work) + + // Pre-process the manifests to apply. + // + // In this step, Fleet will: + // a) decode the manifests; and + // b) write ahead the manifest processing attempts; and + // c) remove any applied manifests left over from previous runs. + if err := r.preProcessManifests(ctx, bundles, work, expectedAppliedWorkOwnerRef); err != nil { + klog.ErrorS(err, "Failed to pre-process the manifests", "work", workRef) + return ctrl.Result{}, err + } + + // WIP (chenyu1): to be added in subsequent PRs. + return ctrl.Result{}, nil +} + +// preProcessManifests pre-processes manifests for the later ops. +func (r *Reconciler) preProcessManifests( + ctx context.Context, + bundles []*manifestProcessingBundle, + work *fleetv1beta1.Work, + expectedAppliedWorkOwnerRef *metav1.OwnerReference, +) error { + // Decode the manifests. + // Run the decoding in parallel to boost performance. + // + // This is concurrency safe as the bundles slice has been pre-allocated. + + // Prepare a child context. + // Cancel the child context anyway to avoid leaks. + childCtx, cancel := context.WithCancel(ctx) + defer cancel() + + doWork := func(pieces int) { + bundle := bundles[pieces] + if bundle.applyErr != nil { + // Skip a manifest if it cannot be processed. + return + } + + gvr, manifestObj, err := r.decodeManifest(bundle.manifest) + // Build the identifier. Note that this would return an identifier even if the decoding + // fails. + bundle.id = buildWorkResourceIdentifier(pieces, gvr, manifestObj) + if err != nil { + klog.ErrorS(err, "Failed to decode the manifest", "ordinal", pieces, "work", klog.KObj(work)) + bundle.applyErr = fmt.Errorf("failed to decode manifest: %w", err) + bundle.applyResTyp = ManifestProcessingApplyResultTypeDecodingErred + return + } + + // Reject objects with generate names. + if len(manifestObj.GetGenerateName()) > 0 { + klog.V(2).InfoS("Reject objects with generate names", "manifestObj", klog.KObj(manifestObj), "work", klog.KObj(work)) + bundle.applyErr = fmt.Errorf("objects with generate names are not supported") + bundle.applyResTyp = ManifestProcessingApplyResultTypeFoundGenerateName + return + } + + bundle.manifestObj = manifestObj + bundle.gvr = gvr + klog.V(2).InfoS("Decoded a manifest", + "manifestObj", klog.KObj(manifestObj), + "GVR", *gvr, + "work", klog.KObj(work)) + } + r.parallelizer.ParallelizeUntil(childCtx, len(bundles), doWork, "decodingManifests") + + // Write ahead the manifest processing attempts. In the process Fleet will also perform a + // cleanup to remove any left-over manifests that are applied from previous runs. + // + // This is necessary primarily for the reason that there exists a corner case where the agent + // could crash right after manifests are applied but before the status is properly updated, + // and upon the agent's restart, the list of manifests has changed (some manifests have been + // removed). This would lead to a situation where Fleet would lose track of the removed + // manifests. + // + // To address this corner case, Fleet writes ahead the manifest processing attempts to Work + // object status, and through cross-reference, Fleet will be able to determine if there exists + // left-over manifests and perform clean-up as appropriate. + // + // To avoid conflicts (or the hassle of preparing individual patches), the status update is + // done in batch. + // + // Note that during the write-ahead process, Fleet will also perform a de-duplication check, which + // guarantees that no object with the same GVK + namespace/name combo would be processed + // twice. + // + // This check is done on the Work object scope, and is primarily added to address the case + // where duplicate objects might appear in a Fleet resource envelope and lead to unexpected + // behaviors. Duplication is a non-issue without Fleet resource envelopes, as the Fleet hub + // cluster Kubernetes API server already promises uniqueness when resources are first created. + return r.writeAheadManifestProcessingAttempts(ctx, bundles, work, expectedAppliedWorkOwnerRef) +} + +// writeAheadManifestProcessingAttempts helps write ahead manifest processing attempts so that +// Fleet can always track applied manifests, even upon untimely crashes. This method will +// also check for any leftover apply attempts from previous runs and clean them up (if the +// correspond object has been applied). +func (r *Reconciler) writeAheadManifestProcessingAttempts( + ctx context.Context, + bundles []*manifestProcessingBundle, + work *fleetv1beta1.Work, + expectedAppliedWorkOwnerRef *metav1.OwnerReference, +) error { + workRef := klog.KObj(work) + + // Prepare the status update (the new manifest conditions) for the write-ahead process. + // + // Note that even though we pre-allocate the slice, the length is set to 0. This is to + // accommodate the case where there might manifests that have failed pre-processing; + // such manifests will not be included in this round's status update. + manifestCondsForWA := make([]fleetv1beta1.ManifestCondition, 0, len(bundles)) + + // Prepare an query index of existing manifest conditions on the Work object for quicker + // lookups. + existingManifestCondQIdx := prepareExistingManifestCondQIdx(work.Status.ManifestConditions) + + // For each manifest, verify if it has been tracked in the newly prepared manifest conditions. + // This helps signal duplicated resources in the Work object. + checked := make(map[string]bool, len(bundles)) + for idx := range bundles { + bundle := bundles[idx] + if bundle.applyErr != nil { + // Skip a manifest if it cannot be pre-processed, i.e., it can only be identified by + // its ordinal. + // + // Such manifests would still be reported in the status (see the later parts of the + // reconciliation loop), it is just that they are not relevant in the write-ahead + // process. + continue + } + + // Register the manifest in the checked map; if another manifest with the same identifier + // has been checked before, Fleet would mark the current manifest as a duplicate and skip + // it. + // + // A side note: Golang does support using structs as map keys; preparing the string + // representations of structs as keys can help performance, though not by much. The reason + // why string representations are used here is not for performance, though; instead, it + // is to address the issue that for this comparison, ordinals should be ignored. + wriStr, err := formatWRIString(bundle.id) + if err != nil { + // Normally this branch will never run as all manifests that cannot be decoded has been + // skipped in the check above. Here Fleet simply skips the manifest. + klog.ErrorS(err, "Failed to format the work resource identifier string", + "ordinal", idx, "work", workRef) + continue + } + if _, found := checked[wriStr]; found { + klog.V(2).InfoS("A duplicate manifest has been found", + "ordinal", idx, "work", workRef, "WRI", wriStr) + bundle.applyErr = fmt.Errorf("a duplicate manifest has been found") + bundle.applyResTyp = ManifestProcessingApplyResultTypeDuplicated + continue + } + checked[wriStr] = true + + // Prepare the manifest conditions for the write-ahead process. + manifestCondForWA := prepareManifestCondForWA(wriStr, bundle.id, work.Generation, existingManifestCondQIdx, work.Status.ManifestConditions) + manifestCondsForWA = append(manifestCondsForWA, manifestCondForWA) + + // Keep track of the last drift/diff observed timestamp. + if manifestCondForWA.DriftDetails != nil && !manifestCondForWA.DriftDetails.FirstDriftedObservedTime.IsZero() { + bundle.firstDriftedTimestamp = manifestCondForWA.DriftDetails.FirstDriftedObservedTime.DeepCopy() + } + if manifestCondForWA.DiffDetails != nil && !manifestCondForWA.DiffDetails.FirstDiffedObservedTime.IsZero() { + bundle.firstDiffedTimestamp = manifestCondForWA.DiffDetails.FirstDiffedObservedTime.DeepCopy() + } + + klog.V(2).InfoS("Prepared write-ahead information for a manifest", + "manifestObj", klog.KObj(bundle.manifestObj), "WRI", wriStr, "work", workRef) + } + + // As a shortcut, if there's no spec change in the Work object and the status indicates that + // a previous apply attempt has been recorded (**successful or not**), Fleet will skip the write-ahead + // op. + // + // Note that the shortcut happens after the manifest conditions for the write-ahead process + // are prepared; this is a must as Fleet needs to track certain information, specifically the + // first drifted/diffed timestamps (if any). + workAppliedCond := meta.FindStatusCondition(work.Status.Conditions, fleetv1beta1.WorkConditionTypeApplied) + if workAppliedCond != nil && workAppliedCond.ObservedGeneration == work.Generation { + klog.V(2).InfoS("Attempt to apply the current set of manifests has been made before and the results have been recorded; will skip the write-ahead process", "work", workRef) + return nil + } + + // Identify any manifests from previous runs that might have been applied and are now left + // over in the member cluster. + leftOverManifests := findLeftOverManifests(manifestCondsForWA, existingManifestCondQIdx, work.Status.ManifestConditions) + if err := r.removeLeftOverManifests(ctx, leftOverManifests, expectedAppliedWorkOwnerRef); err != nil { + klog.Errorf("Failed to remove left-over manifests (work=%+v, leftOverManifestCount=%d, removalFailureCount=%d)", + workRef, len(leftOverManifests), len(err.Errors())) + return fmt.Errorf("failed to remove left-over manifests: %w", err) + } + klog.V(2).InfoS("Left-over manifests are found and removed", + "leftOverManifestCount", len(leftOverManifests), "work", workRef) + + // Update the status. + // + // Note that the Work object might have been refreshed by controllers on the hub cluster + // before this step runs; in this case the current reconciliation loop must be abandoned. + if work.Status.Conditions == nil { + // As a sanity check, set an empty set of conditions. Currently the API definition does + // not allow nil conditions. + work.Status.Conditions = []metav1.Condition{} + } + work.Status.ManifestConditions = manifestCondsForWA + if err := r.hubClient.Status().Update(ctx, work); err != nil { + return controller.NewAPIServerError(false, fmt.Errorf("failed to write ahead manifest processing attempts: %w", err)) + } + klog.V(2).InfoS("Write-ahead process completed", "work", workRef) + + // Set the defaults again as the result yielded by the status update might have changed the object. + defaulter.SetDefaultsWork(work) + return nil +} + +// garbageCollectAppliedWork deletes the appliedWork and all the manifests associated with it from the cluster. +func (r *Reconciler) garbageCollectAppliedWork(ctx context.Context, work *fleetv1beta1.Work) (ctrl.Result, error) { + deletePolicy := metav1.DeletePropagationBackground + if !controllerutil.ContainsFinalizer(work, fleetv1beta1.WorkFinalizer) { + return ctrl.Result{}, nil + } + // delete the appliedWork which will remove all the manifests associated with it + // TODO: allow orphaned manifest + appliedWork := fleetv1beta1.AppliedWork{ + ObjectMeta: metav1.ObjectMeta{Name: work.Name}, + } + err := r.spokeClient.Delete(ctx, &appliedWork, &client.DeleteOptions{PropagationPolicy: &deletePolicy}) + switch { + case apierrors.IsNotFound(err): + klog.V(2).InfoS("The appliedWork is already deleted", "appliedWork", work.Name) + case err != nil: + klog.ErrorS(err, "Failed to delete the appliedWork", "appliedWork", work.Name) + return ctrl.Result{}, err + default: + klog.InfoS("Successfully deleted the appliedWork", "appliedWork", work.Name) + } + controllerutil.RemoveFinalizer(work, fleetv1beta1.WorkFinalizer) + return ctrl.Result{}, r.hubClient.Update(ctx, work, &client.UpdateOptions{}) +} + +// ensureAppliedWork makes sure that an associated appliedWork and a finalizer on the work resource exists on the cluster. +func (r *Reconciler) ensureAppliedWork(ctx context.Context, work *fleetv1beta1.Work) (*fleetv1beta1.AppliedWork, error) { + workRef := klog.KObj(work) + appliedWork := &fleetv1beta1.AppliedWork{} + hasFinalizer := false + if controllerutil.ContainsFinalizer(work, fleetv1beta1.WorkFinalizer) { + hasFinalizer = true + err := r.spokeClient.Get(ctx, types.NamespacedName{Name: work.Name}, appliedWork) + switch { + case apierrors.IsNotFound(err): + klog.ErrorS(err, "AppliedWork finalizer resource does not exist even with the finalizer, it will be recreated", "appliedWork", workRef.Name) + case err != nil: + klog.ErrorS(err, "Failed to retrieve the appliedWork ", "appliedWork", workRef.Name) + return nil, controller.NewAPIServerError(true, err) + default: + return appliedWork, nil + } + } + + // we create the appliedWork before setting the finalizer, so it should always exist unless it's deleted behind our back + appliedWork = &fleetv1beta1.AppliedWork{ + ObjectMeta: metav1.ObjectMeta{ + Name: work.Name, + }, + Spec: fleetv1beta1.AppliedWorkSpec{ + WorkName: work.Name, + WorkNamespace: work.Namespace, + }, + } + if err := r.spokeClient.Create(ctx, appliedWork); err != nil && !apierrors.IsAlreadyExists(err) { + klog.ErrorS(err, "AppliedWork create failed", "appliedWork", workRef.Name) + return nil, err + } + if !hasFinalizer { + klog.InfoS("Add the finalizer to the work", "work", workRef) + work.Finalizers = append(work.Finalizers, fleetv1beta1.WorkFinalizer) + return appliedWork, r.hubClient.Update(ctx, work, &client.UpdateOptions{}) + } + klog.InfoS("Recreated the appliedWork resource", "appliedWork", workRef.Name) + return appliedWork, nil +} + +// Decodes the manifest JSON into a Kubernetes unstructured object. +func (r *Reconciler) decodeManifest(manifest *fleetv1beta1.Manifest) (*schema.GroupVersionResource, *unstructured.Unstructured, error) { + unstructuredObj := &unstructured.Unstructured{} + if err := unstructuredObj.UnmarshalJSON(manifest.Raw); err != nil { + return &schema.GroupVersionResource{}, nil, fmt.Errorf("failed to unmarshal JSON: %w", err) + } + + mapping, err := r.restMapper.RESTMapping(unstructuredObj.GroupVersionKind().GroupKind(), unstructuredObj.GroupVersionKind().Version) + if err != nil { + return &schema.GroupVersionResource{}, unstructuredObj, fmt.Errorf("failed to find GVR from member cluster client REST mapping: %w", err) + } + + return &mapping.Resource, unstructuredObj, nil +} + +// removeLeftOverManifests removes applied left-over manifests from the member cluster. +func (r *Reconciler) removeLeftOverManifests( + ctx context.Context, + leftOverManifests []fleetv1beta1.AppliedResourceMeta, + expectedAppliedWorkOwnerRef *metav1.OwnerReference, +) utilerrors.Aggregate { + // Remove all the manifests in parallel. + // + // This is concurrency safe as each worker processes its own applied manifest and writes + // to its own error slot. + + // Prepare a child context. + // Cancel the child context anyway to avoid leaks. + childCtx, cancel := context.WithCancel(ctx) + defer cancel() + + // Pre-allocate the slice. + errs := make([]error, len(leftOverManifests)) + doWork := func(pieces int) { + appliedManifestMeta := leftOverManifests[pieces] + + // Remove the left-over manifest. + err := r.removeOneLeftOverManifest(ctx, appliedManifestMeta, expectedAppliedWorkOwnerRef) + if err != nil { + errs[pieces] = fmt.Errorf("failed to remove the left-over manifest (regular object): %w", err) + } + } + r.parallelizer.ParallelizeUntil(childCtx, len(leftOverManifests), doWork, "removeLeftOverManifests") + + return utilerrors.NewAggregate(errs) +} + +// removeOneLeftOverManifestWithGenerateName removes an applied manifest object that is left over +// in the member cluster. +func (r *Reconciler) removeOneLeftOverManifest( + ctx context.Context, + leftOverManifest fleetv1beta1.AppliedResourceMeta, + expectedAppliedWorkOwnerRef *metav1.OwnerReference, +) error { + // Build the GVR. + gvr := schema.GroupVersionResource{ + Group: leftOverManifest.Group, + Version: leftOverManifest.Version, + Resource: leftOverManifest.Resource, + } + manifestNamespace := leftOverManifest.Namespace + manifestName := leftOverManifest.Name + + inMemberClusterObj, err := r.spokeDynamicClient. + Resource(gvr). + Namespace(manifestNamespace). + Get(ctx, manifestName, metav1.GetOptions{}) + switch { + case err != nil && apierrors.IsNotFound(err): + // The object has been deleted from the member cluster; no further action is needed. + return nil + case err != nil: + // Failed to retrieve the object from the member cluster. + return fmt.Errorf("failed to retrieve the object from the member cluster (gvr=%+v, manifestObj=%+v): %w", gvr, klog.KRef(manifestNamespace, manifestName), err) + case inMemberClusterObj.GetDeletionTimestamp() != nil: + // The object has been marked for deletion; no further action is needed. + return nil + } + + // There are occasions, though rare, where the object has the same GVR + namespace + name + // combo but is not the applied object Fleet tries to find. This could happen if the object + // has been deleted and then re-created manually without Fleet's acknowledgement. In such cases + // Fleet would ignore the object, and this is not registered as an error. + if !isInMemberClusterObjectDerivedFromManifestObj(inMemberClusterObj, expectedAppliedWorkOwnerRef) { + // The object is not derived from the manifest object. + klog.V(2).InfoS("The object to remove is not derived from the manifest object; will not proceed with the removal", + "gvr", gvr, "manifestObj", + klog.KRef(manifestNamespace, manifestName), "inMemberClusterObj", klog.KObj(inMemberClusterObj), + "expectedAppliedWorkOwnerRef", *expectedAppliedWorkOwnerRef) + return nil + } + + switch { + case len(inMemberClusterObj.GetOwnerReferences()) > 1: + // Fleet is not the sole owner of the object; in this case, Fleet will only drop the + // ownership. + klog.V(2).InfoS("The object to remove is co-owned by other sources; Fleet will drop the ownership", + "gvr", gvr, "manifestObj", + klog.KRef(manifestNamespace, manifestName), "inMemberClusterObj", klog.KObj(inMemberClusterObj), + "expectedAppliedWorkOwnerRef", *expectedAppliedWorkOwnerRef) + removeOwnerRef(inMemberClusterObj, expectedAppliedWorkOwnerRef) + if _, err := r.spokeDynamicClient.Resource(gvr).Namespace(manifestNamespace).Update(ctx, inMemberClusterObj, metav1.UpdateOptions{}); err != nil && !apierrors.IsNotFound(err) { + // Failed to drop the ownership. + return fmt.Errorf("failed to drop the ownership of the object (gvr=%+v, manifestObj=%+v, inMemberClusterObj=%+v, expectedAppliedWorkOwnerRef=%+v): %w", + gvr, klog.KRef(manifestNamespace, manifestName), klog.KObj(inMemberClusterObj), *expectedAppliedWorkOwnerRef, err) + } + default: + // Fleet is the sole owner of the object; in this case, Fleet will delete the object. + klog.V(2).InfoS("The object to remove is solely owned by Fleet; Fleet will delete the object", + "gvr", gvr, "manifestObj", + klog.KRef(manifestNamespace, manifestName), "inMemberClusterObj", klog.KObj(inMemberClusterObj), + "expectedAppliedWorkOwnerRef", *expectedAppliedWorkOwnerRef) + inMemberClusterObjUID := inMemberClusterObj.GetUID() + deleteOpts := metav1.DeleteOptions{ + Preconditions: &metav1.Preconditions{ + // Add a UID pre-condition to guard against the case where the object has changed + // right before the deletion request is sent. + // + // Technically speaking resource version based concurrency control should also be + // enabled here; Fleet drops the check to avoid conflicts; this is safe as the Fleet + // ownership is considered to be a reserved field and other changes on the object are + // irrelevant to this step. + UID: &inMemberClusterObjUID, + }, + } + if err := r.spokeDynamicClient.Resource(gvr).Namespace(manifestNamespace).Delete(ctx, manifestName, deleteOpts); err != nil && !apierrors.IsNotFound(err) { + // Failed to delete the object from the member cluster. + return fmt.Errorf("failed to delete the object (gvr=%+v, manifestObj=%+v, inMemberClusterObj=%+v, expectedAppliedWorkOwnerRef=%+v): %w", + gvr, klog.KRef(manifestNamespace, manifestName), klog.KObj(inMemberClusterObj), *expectedAppliedWorkOwnerRef, err) + } + } + return nil +} + +// Join starts to reconcile +func (r *Reconciler) Join(_ context.Context) error { + if !r.joined.Load() { + klog.InfoS("Mark the apply work reconciler joined") + } + r.joined.Store(true) + return nil +} + +// Leave start +func (r *Reconciler) Leave(ctx context.Context) error { + var works fleetv1beta1.WorkList + if r.joined.Load() { + klog.InfoS("Mark the apply work reconciler left") + } + r.joined.Store(false) + // list all the work object we created in the member cluster namespace + listOpts := []client.ListOption{ + client.InNamespace(r.workNameSpace), + } + if err := r.hubClient.List(ctx, &works, listOpts...); err != nil { + klog.ErrorS(err, "Failed to list all the work object", "clusterNS", r.workNameSpace) + return client.IgnoreNotFound(err) + } + // we leave the resources on the member cluster for now + for _, work := range works.Items { + staleWork := work.DeepCopy() + if controllerutil.ContainsFinalizer(staleWork, fleetv1beta1.WorkFinalizer) { + controllerutil.RemoveFinalizer(staleWork, fleetv1beta1.WorkFinalizer) + if updateErr := r.hubClient.Update(ctx, staleWork, &client.UpdateOptions{}); updateErr != nil { + klog.ErrorS(updateErr, "Failed to remove the work finalizer from the work", + "clusterNS", r.workNameSpace, "work", klog.KObj(staleWork)) + return updateErr + } + } + } + klog.V(2).InfoS("Successfully removed all the work finalizers in the cluster namespace", + "clusterNS", r.workNameSpace, "number of work", len(works.Items)) + return nil +} + +// SetupWithManager wires up the controller. +func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + WithOptions(ctrloption.Options{ + MaxConcurrentReconciles: r.concurrentReconciles, + }). + For(&fleetv1beta1.Work{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). + Complete(r) +} diff --git a/pkg/controllers/workapplier/controller_test.go b/pkg/controllers/workapplier/controller_test.go new file mode 100644 index 000000000..cc1db1ad9 --- /dev/null +++ b/pkg/controllers/workapplier/controller_test.go @@ -0,0 +1,345 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package workapplier + +import ( + "context" + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + corev1 "k8s.io/api/core/v1" + + fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" + "go.goms.io/fleet/pkg/utils/parallelizer" + + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/klog/v2" +) + +const ( + workName = "work-1" + + deployName = "deploy-1" + configMapName = "configmap-1" + nsName = "ns-1" + + nsNameTemplate = "ns-%s" +) + +var ( + appliedWorkOwnerRef = &metav1.OwnerReference{ + APIVersion: "placement.kubernetes-fleet.io/v1beta1", + Kind: "AppliedWork", + Name: workName, + UID: "uid", + } +) + +var ( + ignoreFieldTypeMetaInNamespace = cmpopts.IgnoreFields(corev1.Namespace{}, "TypeMeta") +) + +// TestRemoveLeftOverManifests tests the removeLeftOverManifests method. +func TestRemoveLeftOverManifests(t *testing.T) { + ctx := context.Background() + + additionalOwnerRef := &metav1.OwnerReference{ + APIVersion: "v1", + Kind: "SuperNamespace", + Name: "super-ns", + UID: "super-ns-uid", + } + + nsName0 := fmt.Sprintf(nsNameTemplate, "0") + nsName1 := fmt.Sprintf(nsNameTemplate, "1") + nsName2 := fmt.Sprintf(nsNameTemplate, "2") + nsName3 := fmt.Sprintf(nsNameTemplate, "3") + + testCases := []struct { + name string + leftOverManifests []fleetv1beta1.AppliedResourceMeta + inMemberClusterObjs []runtime.Object + wantInMemberClusterObjs []corev1.Namespace + wantRemovedInMemberClusterObjs []corev1.Namespace + }{ + { + name: "mixed", + leftOverManifests: []fleetv1beta1.AppliedResourceMeta{ + // The object is present. + { + WorkResourceIdentifier: *nsWRI(0, nsName0), + }, + // The object cannot be found. + { + WorkResourceIdentifier: *nsWRI(1, nsName1), + }, + // The object is not owned by Fleet. + { + WorkResourceIdentifier: *nsWRI(2, nsName2), + }, + // The object has multiple owners. + { + WorkResourceIdentifier: *nsWRI(3, nsName3), + }, + }, + inMemberClusterObjs: []runtime.Object{ + &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: nsName0, + }, + }, + &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: nsName2, + OwnerReferences: []metav1.OwnerReference{ + *additionalOwnerRef, + }, + }, + }, + &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: nsName3, + OwnerReferences: []metav1.OwnerReference{ + *additionalOwnerRef, + *appliedWorkOwnerRef, + }, + }, + }, + }, + wantInMemberClusterObjs: []corev1.Namespace{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: nsName2, + OwnerReferences: []metav1.OwnerReference{ + *additionalOwnerRef, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: nsName3, + OwnerReferences: []metav1.OwnerReference{ + *additionalOwnerRef, + }, + }, + }, + }, + wantRemovedInMemberClusterObjs: []corev1.Namespace{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: nsName0, + }, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + fakeClient := fake.NewSimpleDynamicClient(scheme.Scheme, tc.inMemberClusterObjs...) + r := &Reconciler{ + spokeDynamicClient: fakeClient, + parallelizer: parallelizer.NewParallelizer(2), + } + if err := r.removeLeftOverManifests(ctx, tc.leftOverManifests, appliedWorkOwnerRef); err != nil { + t.Errorf("removeLeftOverManifests() = %v, want no error", err) + } + + for idx := range tc.wantInMemberClusterObjs { + wantNS := tc.wantInMemberClusterObjs[idx] + + gotUnstructured, err := fakeClient. + Resource(nsGVR). + Namespace(wantNS.GetNamespace()). + Get(ctx, wantNS.GetName(), metav1.GetOptions{}) + if err != nil { + t.Errorf("Get Namespace(%v) = %v, want no error", klog.KObj(&wantNS), err) + continue + } + + gotNS := wantNS.DeepCopy() + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(gotUnstructured.Object, &gotNS); err != nil { + t.Errorf("FromUnstructured() = %v, want no error", err) + } + + if diff := cmp.Diff(gotNS, &wantNS, ignoreFieldTypeMetaInNamespace); diff != "" { + t.Errorf("NS(%v) mismatches (-got +want):\n%s", klog.KObj(&wantNS), diff) + } + } + + for idx := range tc.wantRemovedInMemberClusterObjs { + wantRemovedNS := tc.wantRemovedInMemberClusterObjs[idx] + + gotUnstructured, err := fakeClient. + Resource(nsGVR). + Namespace(wantRemovedNS.GetNamespace()). + Get(ctx, wantRemovedNS.GetName(), metav1.GetOptions{}) + if err != nil { + t.Errorf("Get Namespace(%v) = %v, want no error", klog.KObj(&wantRemovedNS), err) + } + + gotRemovedNS := wantRemovedNS.DeepCopy() + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(gotUnstructured.Object, &gotRemovedNS); err != nil { + t.Errorf("FromUnstructured() = %v, want no error", err) + } + + if !gotRemovedNS.DeletionTimestamp.IsZero() { + t.Errorf("Namespace(%v) has not been deleted", klog.KObj(&wantRemovedNS)) + } + } + }) + } +} + +// TestRemoveOneLeftOverManifest tests the removeOneLeftOverManifest method. +func TestRemoveOneLeftOverManifest(t *testing.T) { + ctx := context.Background() + now := metav1.Now().Rfc3339Copy() + leftOverManifest := fleetv1beta1.AppliedResourceMeta{ + WorkResourceIdentifier: *nsWRI(0, nsName), + } + additionalOwnerRef := &metav1.OwnerReference{ + APIVersion: "v1", + Kind: "SuperNamespace", + Name: "super-ns", + UID: "super-ns-uid", + } + + testCases := []struct { + name string + // To simplify things, for this test Fleet uses a fixed concrete type. + inMemberClusterObj *corev1.Namespace + wantInMemberClusterObj *corev1.Namespace + }{ + { + name: "not found", + }, + { + name: "already deleted", + inMemberClusterObj: &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: nsName, + DeletionTimestamp: &now, + }, + }, + wantInMemberClusterObj: &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: nsName, + DeletionTimestamp: &now, + }, + }, + }, + { + name: "not derived from manifest object", + inMemberClusterObj: &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: nsName, + OwnerReferences: []metav1.OwnerReference{ + *additionalOwnerRef, + }, + }, + }, + wantInMemberClusterObj: &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: nsName, + OwnerReferences: []metav1.OwnerReference{ + *additionalOwnerRef, + }, + }, + }, + }, + { + name: "multiple owners", + inMemberClusterObj: &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: nsName, + OwnerReferences: []metav1.OwnerReference{ + *additionalOwnerRef, + *appliedWorkOwnerRef, + }, + }, + }, + wantInMemberClusterObj: &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: nsName, + OwnerReferences: []metav1.OwnerReference{ + *additionalOwnerRef, + }, + }, + }, + }, + { + name: "deletion", + inMemberClusterObj: &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: nsName, + OwnerReferences: []metav1.OwnerReference{ + *appliedWorkOwnerRef, + }, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var fakeClient *fake.FakeDynamicClient + if tc.inMemberClusterObj != nil { + fakeClient = fake.NewSimpleDynamicClient(scheme.Scheme, tc.inMemberClusterObj) + } else { + fakeClient = fake.NewSimpleDynamicClient(scheme.Scheme) + } + + r := &Reconciler{ + spokeDynamicClient: fakeClient, + } + if err := r.removeOneLeftOverManifest(ctx, leftOverManifest, appliedWorkOwnerRef); err != nil { + t.Errorf("removeOneLeftOverManifest() = %v, want no error", err) + } + + if tc.inMemberClusterObj != nil { + var gotUnstructured *unstructured.Unstructured + var err error + // The method is expected to modify the object. + gotUnstructured, err = fakeClient. + Resource(nsGVR). + Namespace(tc.inMemberClusterObj.GetNamespace()). + Get(ctx, tc.inMemberClusterObj.GetName(), metav1.GetOptions{}) + switch { + case errors.IsNotFound(err) && tc.wantInMemberClusterObj == nil: + // The object is expected to be deleted. + return + case errors.IsNotFound(err): + // An object is expected to be found. + t.Errorf("Get(%v) = %v, want no error", klog.KObj(tc.inMemberClusterObj), err) + return + case err != nil: + // An unexpected error occurred. + t.Errorf("Get(%v) = %v, want no error", klog.KObj(tc.inMemberClusterObj), err) + return + } + + got := tc.wantInMemberClusterObj.DeepCopy() + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(gotUnstructured.Object, &got); err != nil { + t.Errorf("FromUnstructured() = %v, want no error", err) + return + } + + if diff := cmp.Diff(got, tc.wantInMemberClusterObj, ignoreFieldTypeMetaInNamespace); diff != "" { + t.Errorf("NS(%v) mismatches (-got +want):\n%s", klog.KObj(tc.inMemberClusterObj), diff) + } + return + } + }) + } +} diff --git a/pkg/controllers/workapplier/utils.go b/pkg/controllers/workapplier/utils.go new file mode 100644 index 000000000..9c86096d9 --- /dev/null +++ b/pkg/controllers/workapplier/utils.go @@ -0,0 +1,245 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package workapplier + +import ( + "fmt" + "reflect" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/klog/v2" + + fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" +) + +const ( + // A list of condition related values. + ManifestAppliedCondPreparingToProcessReason = "PreparingToProcess" + ManifestAppliedCondPreparingToProcessMessage = "The manifest is being prepared for processing." +) + +func prepareManifestProcessingBundles(work *fleetv1beta1.Work) []*manifestProcessingBundle { + // Pre-allocate the bundles. + bundles := make([]*manifestProcessingBundle, 0, len(work.Spec.Workload.Manifests)) + for idx := range work.Spec.Workload.Manifests { + manifest := work.Spec.Workload.Manifests[idx] + bundles = append(bundles, &manifestProcessingBundle{ + manifest: &manifest, + }) + } + return bundles +} + +// buildWorkResourceIdentifier builds a work resource identifier for a manifest. +// +// Note that if the manifest cannot be decoded/applied, this function will return an identifier with +// the available information on hand. +func buildWorkResourceIdentifier( + manifestIdx int, + gvr *schema.GroupVersionResource, + manifestObj *unstructured.Unstructured, +) *fleetv1beta1.WorkResourceIdentifier { + // The ordinal field is always set. + identifier := &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: manifestIdx, + } + + // Set the GVK, name, namespace, and generate name information if the manifest can be decoded + // as a Kubernetes unstructured object. + // + // Note that: + // * For cluster-scoped objects, the namespace field will be empty. + // * For objects with generated names, the name field will be empty. + // * For regular objects (i.e., objects with a pre-defined name), the generate name field will be empty. + if manifestObj != nil { + identifier.Group = manifestObj.GroupVersionKind().Group + identifier.Version = manifestObj.GroupVersionKind().Version + identifier.Kind = manifestObj.GetKind() + identifier.Name = manifestObj.GetName() + identifier.Namespace = manifestObj.GetNamespace() + } + + // Set the GVR information if the manifest object can be REST mapped. + if gvr != nil { + identifier.Resource = gvr.Resource + } + + return identifier +} + +// formatWRIString returns a string representation of a work resource identifier. +func formatWRIString(wri *fleetv1beta1.WorkResourceIdentifier) (string, error) { + switch { + case wri.Group == "" && wri.Version == "": + // The manifest object cannot be decoded, i.e., it can only be identified by its ordinal. + // + // This branch is added solely for completeness reasons; normally such objects would not + // be included in any cases that would require a WRI string formatting. + return "", fmt.Errorf("the manifest object can only be identified by its ordinal") + default: + // For a regular object, the string representation includes the actual name. + return fmt.Sprintf("GV=%s/%s, Kind=%s, Namespace=%s, Name=%s", + wri.Group, wri.Version, wri.Kind, wri.Namespace, wri.Name), nil + } +} + +func isInMemberClusterObjectDerivedFromManifestObj(inMemberClusterObj *unstructured.Unstructured, expectedAppliedWorkOwnerRef *metav1.OwnerReference) bool { + // Do a sanity check. + if inMemberClusterObj == nil { + return false + } + + // Verify if the owner reference still stands. + curOwners := inMemberClusterObj.GetOwnerReferences() + for idx := range curOwners { + if reflect.DeepEqual(curOwners[idx], *expectedAppliedWorkOwnerRef) { + return true + } + } + return false +} + +func removeOwnerRef(obj *unstructured.Unstructured, expectedAppliedWorkOwnerRef *metav1.OwnerReference) { + ownerRefs := obj.GetOwnerReferences() + updatedOwnerRefs := make([]metav1.OwnerReference, 0, len(ownerRefs)) + + // Re-build the owner references; remove the given one from the list. + for idx := range ownerRefs { + if !reflect.DeepEqual(ownerRefs[idx], *expectedAppliedWorkOwnerRef) { + updatedOwnerRefs = append(updatedOwnerRefs, ownerRefs[idx]) + } + } + obj.SetOwnerReferences(updatedOwnerRefs) +} + +// prepareExistingManifestCondQIdx returns a map that allows quicker look up of a manifest +// condition given a work resource identifier. +func prepareExistingManifestCondQIdx(existingManifestConditions []fleetv1beta1.ManifestCondition) map[string]int { + existingManifestConditionQIdx := make(map[string]int) + for idx := range existingManifestConditions { + manifestCond := existingManifestConditions[idx] + + wriStr, err := formatWRIString(&manifestCond.Identifier) + if err != nil { + // There might be manifest conditions without a valid identifier in the existing set of + // manifest conditions (e.g., decoding error has occurred in the previous run). + // Fleet will skip these manifest conditions. This is not considered as an error. + continue + } + + existingManifestConditionQIdx[wriStr] = idx + } + return existingManifestConditionQIdx +} + +func prepareManifestCondForWA( + wriStr string, wri *fleetv1beta1.WorkResourceIdentifier, + workGeneration int64, + existingManifestCondQIdx map[string]int, + existingManifestConds []fleetv1beta1.ManifestCondition, +) fleetv1beta1.ManifestCondition { + // For each manifest to process, check if there is a corresponding entry in the existing set + // of manifest conditions. If so, Fleet will port information back to keep track of the + // previous processing results; otherwise, Fleet will report that it is preparing to process + // the manifest. + existingManifestConditionIdx, found := existingManifestCondQIdx[wriStr] + if found { + // The current manifest condition has a corresponding entry in the existing set of manifest + // conditions. + // + // Fleet simply ports the information back. + return existingManifestConds[existingManifestConditionIdx] + } + + // No corresponding entry is found in the existing set of manifest conditions. + // + // Prepare a manifest condition that indicates that Fleet is preparing to be process the manifest. + return fleetv1beta1.ManifestCondition{ + Identifier: *wri, + Conditions: []metav1.Condition{ + { + Type: fleetv1beta1.WorkConditionTypeApplied, + Status: metav1.ConditionFalse, + ObservedGeneration: workGeneration, + Reason: ManifestAppliedCondPreparingToProcessReason, + Message: ManifestAppliedCondPreparingToProcessMessage, + LastTransitionTime: metav1.Now(), + }, + }, + } +} + +// findLeftOverManifests returns the manifests that have been left over on the member cluster side. +func findLeftOverManifests( + manifestCondsForWA []fleetv1beta1.ManifestCondition, + existingManifestCondQIdx map[string]int, + existingManifestConditions []fleetv1beta1.ManifestCondition, +) []fleetv1beta1.AppliedResourceMeta { + // Build an index for quicker lookup in the newly prepared write-ahead manifest conditions. + // Here Fleet uses the string representations as map keys to omit ordinals from any lookup. + // + // Note that before this step, Fleet has already filtered out duplicate manifests. + manifestCondsForWAQIdx := make(map[string]int) + for idx := range manifestCondsForWA { + manifestCond := manifestCondsForWA[idx] + + wriStr, err := formatWRIString(&manifestCond.Identifier) + if err != nil { + // Normally this branch will never run as all manifests that cannot be decoded has been + // skipped before this function is called. Here Fleet simply skips the manifest as it + // has no effect on the process. + klog.ErrorS(err, "failed to format the work resource identifier string", "manifest", manifestCond.Identifier) + continue + } + + manifestCondsForWAQIdx[wriStr] = idx + } + + // For each manifest condition in the existing set of manifest conditions, check if + // there is a corresponding entry in the set of manifest conditions prepared for the write-ahead + // process. If not, Fleet will consider that the manifest has been left over on the member + // cluster side and should be removed. + + // Use an AppliedResourceMeta slice to allow code sharing. + leftOverManifests := []fleetv1beta1.AppliedResourceMeta{} + for existingManifestWRIStr, existingManifestCondIdx := range existingManifestCondQIdx { + _, found := manifestCondsForWAQIdx[existingManifestWRIStr] + if !found { + existingManifestCond := existingManifestConditions[existingManifestCondIdx] + // The current manifest condition does not have a corresponding entry in the set of manifest + // conditions prepared for the write-ahead process. + + // Verify if the manifest condition indicates that the manifest could have been + // applied. + applied := meta.FindStatusCondition(existingManifestCond.Conditions, fleetv1beta1.WorkConditionTypeApplied) + if applied.Status == metav1.ConditionTrue || applied.Reason == ManifestAppliedCondPreparingToProcessReason { + // Fleet assumes that the manifest has been applied if: + // a) it has an applied condition set to the True status; or + // b) it has an applied condition which signals that the object is preparing to be processed. + // + // Note that the manifest condition might not be up-to-date, so Fleet will not + // check on the generation information. + leftOverManifests = append(leftOverManifests, fleetv1beta1.AppliedResourceMeta{ + WorkResourceIdentifier: existingManifestCond.Identifier, + // UID information might not be available at this moment; the cleanup process + // will perform additional checks anyway to guard against the + // create-delete-recreate cases and/or same name but different setup cases. + // + // As a side note, it is true that the AppliedWork object status might have + // the UID information; Fleet cannot rely on that though, as the AppliedWork + // status is not guaranteed to be tracking the result of the last apply op. + // Should the Fleet agent restarts multiple times before it gets a chance to + // write the AppliedWork object statys, the UID information in the status + // might be several generations behind. + }) + } + } + } + return leftOverManifests +} diff --git a/pkg/controllers/workapplier/utils_test.go b/pkg/controllers/workapplier/utils_test.go new file mode 100644 index 000000000..20d3d5118 --- /dev/null +++ b/pkg/controllers/workapplier/utils_test.go @@ -0,0 +1,504 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package workapplier + +import ( + "fmt" + "log" + "os" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/scheme" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/utils/ptr" + + fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" +) + +var ( + nsGVR = schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "namespaces", + } + + deploy = &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: deployName, + Namespace: nsName, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: ptr.To(int32(1)), + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "nginx", + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "nginx", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx", + Ports: []corev1.ContainerPort{ + { + ContainerPort: 80, + }, + }, + }, + }, + }, + }, + }, + } + deployUnstructured *unstructured.Unstructured + deployJSON []byte + + ns = &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: nsName, + }, + } + nsUnstructured *unstructured.Unstructured + nsJSON []byte +) + +var ( + lessFuncAppliedResourceMeta = func(i, j fleetv1beta1.AppliedResourceMeta) bool { + iStr := fmt.Sprintf("%s/%s/%s/%s/%s", i.Group, i.Version, i.Kind, i.Namespace, i.Name) + jStr := fmt.Sprintf("%s/%s/%s/%s/%s", j.Group, j.Version, j.Kind, j.Namespace, j.Name) + return iStr < jStr + } + + ignoreFieldConditionLTTMsg = cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime", "Message") +) + +func TestMain(m *testing.M) { + // Add custom APIs to the runtime scheme. + if err := fleetv1beta1.AddToScheme(scheme.Scheme); err != nil { + log.Fatalf("failed to add custom APIs (placement/v1beta1) to the runtime scheme: %v", err) + } + + // Initialize the variables. + initializeVariables() + + os.Exit(m.Run()) +} + +func initializeVariables() { + var err error + + // Regular objects. + // Deployment. + deployGenericMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(deploy) + if err != nil { + log.Fatalf("failed to convert deployment to unstructured: %v", err) + } + deployUnstructured = &unstructured.Unstructured{Object: deployGenericMap} + + deployJSON, err = deployUnstructured.MarshalJSON() + if err != nil { + log.Fatalf("failed to marshal deployment to JSON: %v", err) + } + + // Namespace. + nsGenericMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(ns) + if err != nil { + log.Fatalf("failed to convert namespace to unstructured: %v", err) + } + nsUnstructured = &unstructured.Unstructured{Object: nsGenericMap} + nsJSON, err = nsUnstructured.MarshalJSON() + if err != nil { + log.Fatalf("failed to marshal namespace to JSON: %v", err) + } +} + +func nsWRI(ordinal int, nsName string) *fleetv1beta1.WorkResourceIdentifier { + return &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: ordinal, + Group: "", + Version: "v1", + Kind: "Namespace", + Resource: "namespaces", + Name: nsName, + } +} + +func deployWRI(ordinal int, nsName, deployName string) *fleetv1beta1.WorkResourceIdentifier { + return &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: ordinal, + Group: "apps", + Version: "v1", + Kind: "Deployment", + Resource: "deployments", + Name: deployName, + Namespace: nsName, + } +} + +func manifestAppliedCond(workGeneration int64, status metav1.ConditionStatus, reason, message string) metav1.Condition { + return metav1.Condition{ + Type: fleetv1beta1.WorkConditionTypeApplied, + Status: status, + ObservedGeneration: workGeneration, + Reason: reason, + Message: message, + } +} + +// TestPrepareManifestProcessingBundles tests the prepareManifestProcessingBundles function. +func TestPrepareManifestProcessingBundles(t *testing.T) { + deployJSON := deployJSON + nsJSON := nsJSON + memberReservedNSName := "fleet-member-experimental" + + work := &fleetv1beta1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: workName, + Namespace: memberReservedNSName, + }, + Spec: fleetv1beta1.WorkSpec{ + Workload: fleetv1beta1.WorkloadTemplate{ + Manifests: []fleetv1beta1.Manifest{ + { + RawExtension: runtime.RawExtension{ + Raw: nsJSON, + }, + }, + { + RawExtension: runtime.RawExtension{ + Raw: deployJSON, + }, + }, + }, + }, + }, + } + + bundles := prepareManifestProcessingBundles(work) + wantBundles := []*manifestProcessingBundle{ + { + manifest: &work.Spec.Workload.Manifests[0], + }, + { + manifest: &work.Spec.Workload.Manifests[1], + }, + } + if diff := cmp.Diff(bundles, wantBundles, cmp.AllowUnexported(manifestProcessingBundle{})); diff != "" { + t.Errorf("prepareManifestProcessingBundles() mismatches (-got +want):\n%s", diff) + } +} + +// TestBuildWorkResourceIdentifier tests the buildWorkResourceIdentifier function. +func TestBuildWorkResourceIdentifier(t *testing.T) { + testCases := []struct { + name string + manifestIdx int + gvr *schema.GroupVersionResource + manifestObj *unstructured.Unstructured + wantWRI *fleetv1beta1.WorkResourceIdentifier + }{ + { + name: "ordinal only", + manifestIdx: 0, + wantWRI: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 0, + }, + }, + { + name: "ordinal and manifest object", + manifestIdx: 1, + manifestObj: nsUnstructured, + wantWRI: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 1, + Group: "", + Version: "v1", + Kind: "Namespace", + Name: nsName, + }, + }, + { + name: "ordinal, manifest object, and GVR", + manifestIdx: 2, + gvr: &schema.GroupVersionResource{ + Group: "apps", + Version: "v1", + Resource: "deployments", + }, + manifestObj: deployUnstructured, + wantWRI: deployWRI(2, nsName, deployName), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + wri := buildWorkResourceIdentifier(tc.manifestIdx, tc.gvr, tc.manifestObj) + if diff := cmp.Diff(wri, tc.wantWRI); diff != "" { + t.Errorf("buildWorkResourceIdentifier() mismatches (-got +want):\n%s", diff) + } + }) + } +} + +// TestFormatWRIString tests the formatWRIString function. +func TestFormatWRIString(t *testing.T) { + testCases := []struct { + name string + wri *fleetv1beta1.WorkResourceIdentifier + wantWRIString string + wantErred bool + }{ + { + name: "ordinal only", + wri: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 0, + }, + wantErred: true, + }, + { + name: "regular object", + wri: deployWRI(2, nsName, deployName), + wantWRIString: fmt.Sprintf("GV=apps/v1, Kind=Deployment, Namespace=%s, Name=%s", nsName, deployName), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + wriString, err := formatWRIString(tc.wri) + if tc.wantErred { + if err == nil { + t.Errorf("formatWRIString() = nil, want error") + } + return + } + if err != nil { + t.Fatalf("formatWRIString() = %v, want no error", err) + } + + if wriString != tc.wantWRIString { + t.Errorf("formatWRIString() mismatches: got %q, want %q", wriString, tc.wantWRIString) + } + }) + } +} + +// TestPrepareExistingManifestCondQIdx tests the prepareExistingManifestCondQIdx function. +func TestPrepareExistingManifestCondQIdx(t *testing.T) { + testCases := []struct { + name string + existingManifestConds []fleetv1beta1.ManifestCondition + wantQIdx map[string]int + }{ + { + name: "mixed", + existingManifestConds: []fleetv1beta1.ManifestCondition{ + { + Identifier: *nsWRI(0, nsName), + }, + { + Identifier: fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 1, + }, + }, + { + Identifier: *deployWRI(2, nsName, deployName), + }, + }, + wantQIdx: map[string]int{ + fmt.Sprintf("GV=/v1, Kind=Namespace, Namespace=, Name=%s", nsName): 0, + fmt.Sprintf("GV=apps/v1, Kind=Deployment, Namespace=%s, Name=%s", nsName, deployName): 2, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + qIdx := prepareExistingManifestCondQIdx(tc.existingManifestConds) + if diff := cmp.Diff(qIdx, tc.wantQIdx); diff != "" { + t.Errorf("prepareExistingManifestCondQIdx() mismatches (-got +want):\n%s", diff) + } + }) + } +} + +// TestPrepareManifestCondForWA tests the prepareManifestCondForWA function. +func TestPrepareManifestCondForWA(t *testing.T) { + workGeneration := int64(0) + + testCases := []struct { + name string + wriStr string + wri *fleetv1beta1.WorkResourceIdentifier + workGeneration int64 + existingManifestCondQIdx map[string]int + existingManifestConds []fleetv1beta1.ManifestCondition + wantManifestCondForWA *fleetv1beta1.ManifestCondition + }{ + { + name: "match found", + wriStr: fmt.Sprintf("GV=/v1, Kind=Namespace, Namespace=, Name=%s", nsName), + wri: nsWRI(0, nsName), + workGeneration: workGeneration, + existingManifestCondQIdx: map[string]int{ + fmt.Sprintf("GV=/v1, Kind=Namespace, Namespace=, Name=%s", nsName): 0, + }, + existingManifestConds: []fleetv1beta1.ManifestCondition{ + { + Identifier: *nsWRI(0, nsName), + Conditions: []metav1.Condition{ + manifestAppliedCond(workGeneration, metav1.ConditionTrue, string(ManifestProcessingApplyResultTypeApplied), ManifestProcessingApplyResultTypeAppliedDescription), + }, + }, + }, + wantManifestCondForWA: &fleetv1beta1.ManifestCondition{ + Identifier: *nsWRI(0, nsName), + Conditions: []metav1.Condition{ + manifestAppliedCond(workGeneration, metav1.ConditionTrue, string(ManifestProcessingApplyResultTypeApplied), ManifestProcessingApplyResultTypeAppliedDescription), + }, + }, + }, + { + name: "match not found", + wriStr: fmt.Sprintf("GV=apps/v1, Kind=Deployment, Namespace=%s, Name=%s", nsName, deployName), + wri: deployWRI(1, nsName, deployName), + workGeneration: workGeneration, + wantManifestCondForWA: &fleetv1beta1.ManifestCondition{ + Identifier: *deployWRI(1, nsName, deployName), + Conditions: []metav1.Condition{ + manifestAppliedCond(workGeneration, metav1.ConditionFalse, ManifestAppliedCondPreparingToProcessReason, ManifestAppliedCondPreparingToProcessMessage), + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + manifestCondForWA := prepareManifestCondForWA(tc.wriStr, tc.wri, tc.workGeneration, tc.existingManifestCondQIdx, tc.existingManifestConds) + if diff := cmp.Diff(&manifestCondForWA, tc.wantManifestCondForWA, ignoreFieldConditionLTTMsg); diff != "" { + t.Errorf("prepareManifestCondForWA() mismatches (-got +want):\n%s", diff) + } + }) + } +} + +// TestFindLeftOverManifests tests the findLeftOverManifests function. +func TestFindLeftOverManifests(t *testing.T) { + workGeneration0 := int64(0) + workGeneration1 := int64(1) + + nsName0 := fmt.Sprintf(nsNameTemplate, "0") + nsName1 := fmt.Sprintf(nsNameTemplate, "1") + nsName2 := fmt.Sprintf(nsNameTemplate, "2") + nsName3 := fmt.Sprintf(nsNameTemplate, "3") + nsName4 := fmt.Sprintf(nsNameTemplate, "4") + + testCases := []struct { + name string + manifestCondsForWA []fleetv1beta1.ManifestCondition + existingManifestCondQIdx map[string]int + existingManifestConditions []fleetv1beta1.ManifestCondition + wantLeftOverManifests []fleetv1beta1.AppliedResourceMeta + }{ + { + name: "mixed", + manifestCondsForWA: []fleetv1beta1.ManifestCondition{ + // New manifest. + { + Identifier: *nsWRI(0, nsName0), + Conditions: []metav1.Condition{ + manifestAppliedCond(workGeneration1, metav1.ConditionFalse, ManifestAppliedCondPreparingToProcessReason, ManifestAppliedCondPreparingToProcessMessage), + }, + }, + // Existing manifest. + { + Identifier: *nsWRI(1, nsName1), + Conditions: []metav1.Condition{ + manifestAppliedCond(workGeneration0, metav1.ConditionTrue, string(ManifestProcessingApplyResultTypeApplied), ManifestProcessingApplyResultTypeAppliedDescription), + }, + }, + }, + existingManifestConditions: []fleetv1beta1.ManifestCondition{ + // Manifest condition that signals a decoding error. + { + Identifier: fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 0, + }, + }, + // Manifest condition that corresponds to the existing manifest. + { + Identifier: *nsWRI(1, nsName1), + Conditions: []metav1.Condition{ + manifestAppliedCond(workGeneration0, metav1.ConditionTrue, string(ManifestProcessingApplyResultTypeApplied), ManifestProcessingApplyResultTypeAppliedDescription), + }, + }, + // Manifest condition that corresponds to a previously applied and now gone manifest. + { + Identifier: *nsWRI(2, nsName2), + Conditions: []metav1.Condition{ + manifestAppliedCond(workGeneration0, metav1.ConditionTrue, string(ManifestProcessingApplyResultTypeApplied), ManifestProcessingApplyResultTypeAppliedDescription), + }, + }, + // Manifest condition that corresponds to a gone manifest that failed to be applied. + { + Identifier: *nsWRI(3, nsName3), + Conditions: []metav1.Condition{ + manifestAppliedCond(workGeneration0, metav1.ConditionFalse, string(ManifestProcessingApplyResultTypeFailedToApply), ""), + }, + }, + // Manifest condition that corresponds to a gone manifest that has been marked as to be applied (preparing to be processed). + { + Identifier: *nsWRI(4, nsName4), + Conditions: []metav1.Condition{ + manifestAppliedCond(workGeneration0, metav1.ConditionFalse, ManifestAppliedCondPreparingToProcessReason, ManifestAppliedCondPreparingToProcessMessage), + }, + }, + }, + existingManifestCondQIdx: map[string]int{ + fmt.Sprintf("GV=/v1, Kind=Namespace, Namespace=, Name=%s", nsName1): 1, + fmt.Sprintf("GV=/v1, Kind=Namespace, Namespace=, Name=%s", nsName2): 2, + fmt.Sprintf("GV=/v1, Kind=Namespace, Namespace=, Name=%s", nsName3): 3, + fmt.Sprintf("GV=/v1, Kind=Namespace, Namespace=, Name=%s", nsName4): 4, + }, + wantLeftOverManifests: []fleetv1beta1.AppliedResourceMeta{ + { + WorkResourceIdentifier: *nsWRI(2, nsName2), + }, + { + WorkResourceIdentifier: *nsWRI(4, nsName4), + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + leftOverManifests := findLeftOverManifests(tc.manifestCondsForWA, tc.existingManifestCondQIdx, tc.existingManifestConditions) + if diff := cmp.Diff(leftOverManifests, tc.wantLeftOverManifests, cmpopts.SortSlices(lessFuncAppliedResourceMeta)); diff != "" { + t.Errorf("findLeftOverManifests() mismatches (-got +want):\n%s", diff) + } + }) + } +} diff --git a/pkg/scheduler/framework/framework.go b/pkg/scheduler/framework/framework.go index e2349bcb6..3064c6589 100644 --- a/pkg/scheduler/framework/framework.go +++ b/pkg/scheduler/framework/framework.go @@ -29,10 +29,10 @@ import ( clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1" placementv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" "go.goms.io/fleet/pkg/scheduler/clustereligibilitychecker" - "go.goms.io/fleet/pkg/scheduler/framework/parallelizer" "go.goms.io/fleet/pkg/utils/annotations" "go.goms.io/fleet/pkg/utils/condition" "go.goms.io/fleet/pkg/utils/controller" + "go.goms.io/fleet/pkg/utils/parallelizer" ) const ( diff --git a/pkg/scheduler/framework/framework_test.go b/pkg/scheduler/framework/framework_test.go index 07ba7b1ff..7599d80d9 100644 --- a/pkg/scheduler/framework/framework_test.go +++ b/pkg/scheduler/framework/framework_test.go @@ -29,7 +29,7 @@ import ( clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1" placementv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" "go.goms.io/fleet/pkg/scheduler/clustereligibilitychecker" - "go.goms.io/fleet/pkg/scheduler/framework/parallelizer" + "go.goms.io/fleet/pkg/utils/parallelizer" ) const ( diff --git a/pkg/utils/defaulter/clusterresourceplacement.go b/pkg/utils/defaulter/clusterresourceplacement.go index 3bfe9866b..e4e5bfbcb 100644 --- a/pkg/utils/defaulter/clusterresourceplacement.go +++ b/pkg/utils/defaulter/clusterresourceplacement.go @@ -80,16 +80,31 @@ func SetDefaultsClusterResourcePlacement(obj *fleetv1beta1.ClusterResourcePlacem if obj.Spec.Strategy.ApplyStrategy == nil { obj.Spec.Strategy.ApplyStrategy = &fleetv1beta1.ApplyStrategy{} } - if obj.Spec.Strategy.ApplyStrategy.Type == "" { - obj.Spec.Strategy.ApplyStrategy.Type = fleetv1beta1.ApplyStrategyTypeClientSideApply + SetDefaultsApplyStrategy(obj.Spec.Strategy.ApplyStrategy) + + if obj.Spec.RevisionHistoryLimit == nil { + obj.Spec.RevisionHistoryLimit = ptr.To(int32(DefaultRevisionHistoryLimitValue)) + } +} + +// SetDefaultsApplyStrategy sets the default values for an ApplyStrategy object. +func SetDefaultsApplyStrategy(obj *fleetv1beta1.ApplyStrategy) { + if obj.Type == "" { + obj.Type = fleetv1beta1.ApplyStrategyTypeClientSideApply } - if obj.Spec.Strategy.ApplyStrategy.Type == fleetv1beta1.ApplyStrategyTypeServerSideApply && obj.Spec.Strategy.ApplyStrategy.ServerSideApplyConfig == nil { - obj.Spec.Strategy.ApplyStrategy.ServerSideApplyConfig = &fleetv1beta1.ServerSideApplyConfig{ + if obj.Type == fleetv1beta1.ApplyStrategyTypeServerSideApply && obj.ServerSideApplyConfig == nil { + obj.ServerSideApplyConfig = &fleetv1beta1.ServerSideApplyConfig{ ForceConflicts: false, } } - if obj.Spec.RevisionHistoryLimit == nil { - obj.Spec.RevisionHistoryLimit = ptr.To(int32(DefaultRevisionHistoryLimitValue)) + if obj.ComparisonOption == "" { + obj.ComparisonOption = fleetv1beta1.ComparisonOptionTypePartialComparison + } + if obj.WhenToApply == "" { + obj.WhenToApply = fleetv1beta1.WhenToApplyTypeAlways + } + if obj.WhenToTakeOver == "" { + obj.WhenToTakeOver = fleetv1beta1.WhenToTakeOverTypeAlways } } diff --git a/pkg/utils/defaulter/clusterresourceplacement_test.go b/pkg/utils/defaulter/clusterresourceplacement_test.go index ff18371c3..7851d5a17 100644 --- a/pkg/utils/defaulter/clusterresourceplacement_test.go +++ b/pkg/utils/defaulter/clusterresourceplacement_test.go @@ -38,7 +38,10 @@ func TestSetDefaultsClusterResourcePlacement(t *testing.T) { UnavailablePeriodSeconds: ptr.To(DefaultUnavailablePeriodSeconds), }, ApplyStrategy: &fleetv1beta1.ApplyStrategy{ - Type: fleetv1beta1.ApplyStrategyTypeClientSideApply, + Type: fleetv1beta1.ApplyStrategyTypeClientSideApply, + ComparisonOption: fleetv1beta1.ComparisonOptionTypePartialComparison, + WhenToApply: fleetv1beta1.WhenToApplyTypeAlways, + WhenToTakeOver: fleetv1beta1.WhenToTakeOverTypeAlways, }, }, RevisionHistoryLimit: ptr.To(int32(DefaultRevisionHistoryLimitValue)), @@ -69,7 +72,10 @@ func TestSetDefaultsClusterResourcePlacement(t *testing.T) { UnavailablePeriodSeconds: ptr.To(15), }, ApplyStrategy: &fleetv1beta1.ApplyStrategy{ - Type: fleetv1beta1.ApplyStrategyTypeClientSideApply, + Type: fleetv1beta1.ApplyStrategyTypeClientSideApply, + ComparisonOption: fleetv1beta1.ComparisonOptionTypePartialComparison, + WhenToApply: fleetv1beta1.WhenToApplyTypeAlways, + WhenToTakeOver: fleetv1beta1.WhenToTakeOverTypeAlways, }, }, RevisionHistoryLimit: ptr.To(int32(10)), @@ -101,7 +107,10 @@ func TestSetDefaultsClusterResourcePlacement(t *testing.T) { UnavailablePeriodSeconds: ptr.To(15), }, ApplyStrategy: &fleetv1beta1.ApplyStrategy{ - Type: fleetv1beta1.ApplyStrategyTypeClientSideApply, + Type: fleetv1beta1.ApplyStrategyTypeClientSideApply, + ComparisonOption: fleetv1beta1.ComparisonOptionTypePartialComparison, + WhenToApply: fleetv1beta1.WhenToApplyTypeAlways, + WhenToTakeOver: fleetv1beta1.WhenToTakeOverTypeAlways, }, }, RevisionHistoryLimit: ptr.To(int32(10)), @@ -131,7 +140,10 @@ func TestSetDefaultsClusterResourcePlacement(t *testing.T) { UnavailablePeriodSeconds: ptr.To(DefaultUnavailablePeriodSeconds), }, ApplyStrategy: &fleetv1beta1.ApplyStrategy{ - Type: fleetv1beta1.ApplyStrategyTypeServerSideApply, + Type: fleetv1beta1.ApplyStrategyTypeServerSideApply, + ComparisonOption: fleetv1beta1.ComparisonOptionTypePartialComparison, + WhenToApply: fleetv1beta1.WhenToApplyTypeAlways, + WhenToTakeOver: fleetv1beta1.WhenToTakeOverTypeAlways, ServerSideApplyConfig: &fleetv1beta1.ServerSideApplyConfig{ ForceConflicts: false, }, diff --git a/pkg/utils/defaulter/work.go b/pkg/utils/defaulter/work.go index c16a8a741..3db4b75d8 100644 --- a/pkg/utils/defaulter/work.go +++ b/pkg/utils/defaulter/work.go @@ -13,14 +13,5 @@ func SetDefaultsWork(w *placementv1beta1.Work) { if w.Spec.ApplyStrategy == nil { w.Spec.ApplyStrategy = &placementv1beta1.ApplyStrategy{} } - - if w.Spec.ApplyStrategy.Type == "" { - w.Spec.ApplyStrategy.Type = placementv1beta1.ApplyStrategyTypeClientSideApply - } - - if w.Spec.ApplyStrategy.Type == placementv1beta1.ApplyStrategyTypeServerSideApply && w.Spec.ApplyStrategy.ServerSideApplyConfig == nil { - w.Spec.ApplyStrategy.ServerSideApplyConfig = &placementv1beta1.ServerSideApplyConfig{ - ForceConflicts: false, - } - } + SetDefaultsApplyStrategy(w.Spec.ApplyStrategy) } diff --git a/pkg/utils/defaulter/work_test.go b/pkg/utils/defaulter/work_test.go index a2cbca363..7bfdc0f4a 100644 --- a/pkg/utils/defaulter/work_test.go +++ b/pkg/utils/defaulter/work_test.go @@ -26,7 +26,12 @@ func TestSetDefaultsWork(t *testing.T) { }, want: placementv1beta1.Work{ Spec: placementv1beta1.WorkSpec{ - ApplyStrategy: &placementv1beta1.ApplyStrategy{Type: placementv1beta1.ApplyStrategyTypeClientSideApply}, + ApplyStrategy: &placementv1beta1.ApplyStrategy{ + Type: placementv1beta1.ApplyStrategyTypeClientSideApply, + ComparisonOption: placementv1beta1.ComparisonOptionTypePartialComparison, + WhenToApply: placementv1beta1.WhenToApplyTypeAlways, + WhenToTakeOver: placementv1beta1.WhenToTakeOverTypeAlways, + }, }, }, }, @@ -39,7 +44,12 @@ func TestSetDefaultsWork(t *testing.T) { }, want: placementv1beta1.Work{ Spec: placementv1beta1.WorkSpec{ - ApplyStrategy: &placementv1beta1.ApplyStrategy{Type: placementv1beta1.ApplyStrategyTypeClientSideApply}, + ApplyStrategy: &placementv1beta1.ApplyStrategy{ + Type: placementv1beta1.ApplyStrategyTypeClientSideApply, + ComparisonOption: placementv1beta1.ComparisonOptionTypePartialComparison, + WhenToApply: placementv1beta1.WhenToApplyTypeAlways, + WhenToTakeOver: placementv1beta1.WhenToTakeOverTypeAlways, + }, }, }, }, @@ -47,13 +57,18 @@ func TestSetDefaultsWork(t *testing.T) { name: "nil server side apply config", work: placementv1beta1.Work{ Spec: placementv1beta1.WorkSpec{ - ApplyStrategy: &placementv1beta1.ApplyStrategy{Type: placementv1beta1.ApplyStrategyTypeServerSideApply}, + ApplyStrategy: &placementv1beta1.ApplyStrategy{ + Type: placementv1beta1.ApplyStrategyTypeServerSideApply, + }, }, }, want: placementv1beta1.Work{ Spec: placementv1beta1.WorkSpec{ ApplyStrategy: &placementv1beta1.ApplyStrategy{ Type: placementv1beta1.ApplyStrategyTypeServerSideApply, + ComparisonOption: placementv1beta1.ComparisonOptionTypePartialComparison, + WhenToApply: placementv1beta1.WhenToApplyTypeAlways, + WhenToTakeOver: placementv1beta1.WhenToTakeOverTypeAlways, ServerSideApplyConfig: &placementv1beta1.ServerSideApplyConfig{ForceConflicts: false}, }, }, @@ -65,6 +80,9 @@ func TestSetDefaultsWork(t *testing.T) { Spec: placementv1beta1.WorkSpec{ ApplyStrategy: &placementv1beta1.ApplyStrategy{ Type: placementv1beta1.ApplyStrategyTypeServerSideApply, + ComparisonOption: placementv1beta1.ComparisonOptionTypePartialComparison, + WhenToApply: placementv1beta1.WhenToApplyTypeAlways, + WhenToTakeOver: placementv1beta1.WhenToTakeOverTypeAlways, ServerSideApplyConfig: &placementv1beta1.ServerSideApplyConfig{ForceConflicts: true}, }, }, @@ -73,6 +91,9 @@ func TestSetDefaultsWork(t *testing.T) { Spec: placementv1beta1.WorkSpec{ ApplyStrategy: &placementv1beta1.ApplyStrategy{ Type: placementv1beta1.ApplyStrategyTypeServerSideApply, + ComparisonOption: placementv1beta1.ComparisonOptionTypePartialComparison, + WhenToApply: placementv1beta1.WhenToApplyTypeAlways, + WhenToTakeOver: placementv1beta1.WhenToTakeOverTypeAlways, ServerSideApplyConfig: &placementv1beta1.ServerSideApplyConfig{ForceConflicts: true}, }, }, diff --git a/pkg/scheduler/framework/parallelizer/errorflag.go b/pkg/utils/parallelizer/errorflag.go similarity index 100% rename from pkg/scheduler/framework/parallelizer/errorflag.go rename to pkg/utils/parallelizer/errorflag.go diff --git a/pkg/scheduler/framework/parallelizer/errorflag_test.go b/pkg/utils/parallelizer/errorflag_test.go similarity index 100% rename from pkg/scheduler/framework/parallelizer/errorflag_test.go rename to pkg/utils/parallelizer/errorflag_test.go diff --git a/pkg/scheduler/framework/parallelizer/parallelizer.go b/pkg/utils/parallelizer/parallelizer.go similarity index 100% rename from pkg/scheduler/framework/parallelizer/parallelizer.go rename to pkg/utils/parallelizer/parallelizer.go diff --git a/pkg/scheduler/framework/parallelizer/parallelizer_test.go b/pkg/utils/parallelizer/parallelizer_test.go similarity index 100% rename from pkg/scheduler/framework/parallelizer/parallelizer_test.go rename to pkg/utils/parallelizer/parallelizer_test.go diff --git a/test/apis/v1alpha1/zz_generated.deepcopy.go b/test/apis/v1alpha1/zz_generated.deepcopy.go index 0b5d2e30b..ef7e4433a 100644 --- a/test/apis/v1alpha1/zz_generated.deepcopy.go +++ b/test/apis/v1alpha1/zz_generated.deepcopy.go @@ -10,7 +10,7 @@ Licensed under the MIT license. package v1alpha1 import ( - "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" )