Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable work generator to process apply strategy updates + DiffReported condition #1016

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
6 changes: 4 additions & 2 deletions apis/placement/v1beta1/binding_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,10 @@ const (
// - "Unknown" means we haven't finished the apply yet so that we cannot check the resource availability.
ResourceBindingAvailable ResourceBindingConditionType = "Available"

// ResourceBindingDiffReported indicates whether Fleet has successfully reported configuration
// differences between the hub cluster and a member cluster for the given resources.
// ResourceBindingDiffReported indicates that Fleet has successfully reported configuration
// differences between the hub cluster and a specific member cluster for the given resources.
//
// This condition is added only when the ReportDiff apply strategy is used.
//
// It can have the following condition statuses:
// * True: Fleet has successfully reported configuration differences for all resources.
Expand Down
132 changes: 126 additions & 6 deletions pkg/controllers/workgenerator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@ func (r *Reconciler) Reconcile(ctx context.Context, req controllerruntime.Reques
// Though the bounded binding is not taking the latest resourceSnapshot, we still needs to reconcile the works.
if !condition.IsConditionStatusFalse(rolloutStartedCondition, resourceBinding.Generation) &&
!condition.IsConditionStatusTrue(rolloutStartedCondition, resourceBinding.Generation) {
// The rollout controller is still in the processing of updating the condition
// The rollout controller is still in the processing of updating the condition.
//
// Note that running this branch would also skip the refreshing of apply strategies;
// it will resume once the rollout controller updates the rollout started condition.
klog.V(2).InfoS("Requeue the resource binding until the rollout controller finishes updating the status", "resourceBinding", bindingRef, "generation", resourceBinding.Generation, "rolloutStartedCondition", rolloutStartedCondition)
return controllerruntime.Result{Requeue: true}, nil
}
Expand Down Expand Up @@ -382,6 +385,32 @@ func (r *Reconciler) listAllWorksAssociated(ctx context.Context, resourceBinding
func (r *Reconciler) syncAllWork(ctx context.Context, resourceBinding *fleetv1beta1.ClusterResourceBinding, existingWorks map[string]*fleetv1beta1.Work, cluster *clusterv1beta1.MemberCluster) (bool, bool, error) {
updateAny := atomic.NewBool(false)
resourceBindingRef := klog.KObj(resourceBinding)

// Refresh the apply strategy for all existing works.
//
// This step is performed separately from other refreshes as apply strategy changes are
// CRP-scoped and independent from the resource snapshot management mechanism. In other
// words, even if a work has become stranded (i.e., it is linked to a resource snapshot that
// is no longer present in the system), it should still be able to receive the latest apply
// strategy update.
errs, cctx := errgroup.WithContext(ctx)
for workName := range existingWorks {
w := existingWorks[workName]
errs.Go(func() error {
updated, err := r.syncApplyStrategy(ctx, resourceBinding, w)
if err != nil {
return err
}
if updated {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will revert the applied condition on the binding. It seems that it's fine but please think through.

updateAny.Store(true)
}
return nil
})
}
if updateErr := errs.Wait(); updateErr != nil {
return false, false, updateErr
}

// the hash256 function can can handle empty list https://go.dev/play/p/_4HW17fooXM
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// the hash256 function can can handle empty list https://go.dev/play/p/_4HW17fooXM
// the hash256 function can handle empty list https://go.dev/play/p/_4HW17fooXM

resourceOverrideSnapshotHash, err := resource.HashOf(resourceBinding.Spec.ResourceOverrideSnapshots)
if err != nil {
Expand Down Expand Up @@ -420,7 +449,7 @@ func (r *Reconciler) syncAllWork(ctx context.Context, resourceBinding *fleetv1be

// issue all the create/update requests for the corresponding works for each snapshot in parallel
activeWork := make(map[string]*fleetv1beta1.Work, len(resourceSnapshots))
errs, cctx := errgroup.WithContext(ctx)
errs, cctx = errgroup.WithContext(ctx)
// generate work objects for each resource snapshot
for i := range resourceSnapshots {
snapshot := resourceSnapshots[i]
Expand Down Expand Up @@ -515,6 +544,28 @@ func (r *Reconciler) syncAllWork(ctx context.Context, resourceBinding *fleetv1be
return true, updateAny.Load(), nil
}

// syncApplyStrategy syncs the apply strategy specified on a ClusterResourceBinding object
// to a Work object.
func (r *Reconciler) syncApplyStrategy(
ctx context.Context,
resourceBinding *fleetv1beta1.ClusterResourceBinding,
existingWork *fleetv1beta1.Work,
) (bool, error) {
// Skip the update if no change on apply strategy is needed.
if equality.Semantic.DeepEqual(existingWork.Spec.ApplyStrategy, resourceBinding.Spec.ApplyStrategy) {
return false, nil
}

// Update the apply strategy on the work.
existingWork.Spec.ApplyStrategy = resourceBinding.Spec.ApplyStrategy.DeepCopy()
if err := r.Client.Update(ctx, existingWork); err != nil {
klog.ErrorS(err, "Failed to update the apply strategy on the work", "work", klog.KObj(existingWork), "binding", klog.KObj(resourceBinding))
return true, controller.NewUpdateIgnoreConflictError(err)
}
klog.V(2).InfoS("Successfully updated the apply strategy on the work", "work", klog.KObj(existingWork), "binding", klog.KObj(resourceBinding))
return true, nil
}

// areAllWorkSynced checks if all the works are synced with the resource binding.
func areAllWorkSynced(existingWorks map[string]*fleetv1beta1.Work, resourceBinding *fleetv1beta1.ClusterResourceBinding, _, _ string) bool {
syncedCondition := resourceBinding.GetCondition(string(fleetv1beta1.ResourceBindingWorkSynchronized))
Expand Down Expand Up @@ -700,15 +751,16 @@ func (r *Reconciler) upsertWork(ctx context.Context, newWork, existingWork *flee
} else {
// we already checked the label in fetchAllResourceSnapShots function so no need to check again
resourceIndex, _ := labels.ExtractResourceIndexFromClusterResourceSnapshot(resourceSnapshot)
if workResourceIndex == resourceIndex {
// no need to do anything if the work is generated from the same resource/override snapshots
if workResourceIndex == resourceIndex && equality.Semantic.DeepEqual(existingWork.Spec.ApplyStrategy, newWork.Spec.ApplyStrategy) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also check applyStrategy in the areAllWorkSynced() function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is actually a tricky one; to avoid apply strategy inconsistencies the refresh has to be handled before the check. I will some edits

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need this after we did the update of the applyStrategy first regardless?

// no need to do anything if the work is generated from the same resource/override snapshots,
// and the apply strategy remains the same.
if existingWork.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation] == newWork.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation] &&
existingWork.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation] == newWork.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation] {
klog.V(2).InfoS("Work is associated with the desired resource/override snapshots", "existingROHash", existingWork.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation],
klog.V(2).InfoS("Work is associated with the desired resource/override snapshots and apply strategy", "existingROHash", existingWork.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

"existingCROHash", existingWork.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation], "work", workObj)
return false, nil
}
klog.V(2).InfoS("Work is already associated with the desired resourceSnapshot but still not having the right override snapshots", "resourceIndex", resourceIndex, "work", workObj, "resourceSnapshot", resourceSnapshotObj)
klog.V(2).InfoS("Work is already associated with the desired resourceSnapshot and apply strategy but still not having the right override snapshots", "resourceIndex", resourceIndex, "work", workObj, "resourceSnapshot", resourceSnapshotObj)
}
}
// need to copy the new work to the existing work, only 5 possible changes:
Expand All @@ -723,6 +775,7 @@ func (r *Reconciler) upsertWork(ctx context.Context, newWork, existingWork *flee
existingWork.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation] = newWork.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation]
existingWork.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation] = newWork.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation]
existingWork.Spec.Workload.Manifests = newWork.Spec.Workload.Manifests
existingWork.Spec.ApplyStrategy = newWork.Spec.ApplyStrategy
if err := r.Client.Update(ctx, existingWork); err != nil {
klog.ErrorS(err, "Failed to update the work associated with the resourceSnapshot", "resourceSnapshot", resourceSnapshotObj, "work", workObj)
return true, controller.NewUpdateIgnoreConflictError(err)
Expand Down Expand Up @@ -757,9 +810,37 @@ func getWorkNamePrefixFromSnapshotName(resourceSnapshot *fleetv1beta1.ClusterRes
// setBindingStatus sets the binding status based on the works associated with the binding.
func setBindingStatus(works map[string]*fleetv1beta1.Work, resourceBinding *fleetv1beta1.ClusterResourceBinding) {
bindingRef := klog.KObj(resourceBinding)

// Note (chenyu1): the work generator will refresh the status of a ClusterResourceBinding with
// the following logic:
//
// a) If the currently active apply strategy (as dictated by the ClusterResourceBinding spec)
// is ClientSideApply or ServerSideApply, the work generator will update the Applied and
// Available conditions (plus the details about failed, diffed, and/or drifted placements)
// in the status, as appropriate; the DiffReported condition will not be updated.
// b) If the currently active apply strategy is ReportDiff, the work generator will update
// the DiffReported condition in the status, plus the details about diffed placements;
// the Applied condition will always be set to False, despite the fact that the work applier
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is " the Applied condition will always be set to False" done explicitly?

// will no longer report Applied condition on the Work objects, and the Available condition
// will not be updated.
//
// The always false Applied condition is set to allow the rollout controller to always
// push new resource changes to all bindings, so that users can view configuration differences
// without being blocked by the rollout process.
//
// Note that Fleet will not remove a condition from the status, even if it is no longer updated
// given the current apply strategy; a condition type is added when it is first updated.

// try to gather the resource binding applied status if we didn't update any associated work spec this time
appliedCond := buildAllWorkAppliedCondition(works, resourceBinding)
resourceBinding.SetConditions(appliedCond)

// Set the DiffReported condition if (and only if) a ReportDiff apply strategy is currently
// being used.
if resourceBinding.Spec.ApplyStrategy != nil && resourceBinding.Spec.ApplyStrategy.Type == fleetv1beta1.ApplyStrategyTypeReportDiff {
setAllWorkDiffReportedCondition(works, resourceBinding)
}

var availableCond metav1.Condition
// only try to gather the available status if all the work objects are applied
if appliedCond.Status == metav1.ConditionTrue {
Expand Down Expand Up @@ -850,6 +931,9 @@ func buildAllWorkAppliedCondition(works map[string]*fleetv1beta1.Work, binding *
ObservedGeneration: binding.GetGeneration(),
}
}
// In the case where ReportDiff apply strategy is used, work applier will no longer update
// Applied condition on Work objects; since existing Applied conditions will all become stale,
// this function will return a False Applied condition.
ryanzhang-oss marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines +934 to +936
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need this comment?

return metav1.Condition{
Status: metav1.ConditionFalse,
Type: string(fleetv1beta1.ResourceBindingApplied),
Expand All @@ -859,6 +943,42 @@ func buildAllWorkAppliedCondition(works map[string]*fleetv1beta1.Work, binding *
}
}

// setAllWorkDiffReportedCondition sets the DiffReported condition on a ClusterResourceBinding
// based on the DiffReported conditions on all the related Work objects.
//
// The DiffReported condition of a ClusterResourceBinding object if and only if all the
// related Work objects have their DiffReported condition set to True.
func setAllWorkDiffReportedCondition(works map[string]*fleetv1beta1.Work, binding *fleetv1beta1.ClusterResourceBinding) {
allDiffReported := true
var notDiffReportedWork string
for _, w := range works {
if !condition.IsConditionStatusTrue(meta.FindStatusCondition(w.Status.Conditions, fleetv1beta1.WorkConditionTypeDiffReported), w.GetGeneration()) {
allDiffReported = false
notDiffReportedWork = w.Name
break
}
}

if allDiffReported {
klog.V(2).InfoS("All works associated with the binding have reported diff", "binding", klog.KObj(binding))
meta.SetStatusCondition(&binding.Status.Conditions, metav1.Condition{
Status: metav1.ConditionTrue,
Type: string(fleetv1beta1.ResourceBindingDiffReported),
Reason: condition.AllWorkDiffReportedReason,
Message: "All corresponding work objects have reported diff",
ObservedGeneration: binding.GetGeneration(),
})
return
}
meta.SetStatusCondition(&binding.Status.Conditions, metav1.Condition{
Status: metav1.ConditionFalse,
Type: string(fleetv1beta1.ResourceBindingDiffReported),
Reason: condition.WorkNotDiffReportedReason,
Message: fmt.Sprintf("Work object %s has failed to reported diff", notDiffReportedWork),
ObservedGeneration: binding.GetGeneration(),
})
}

func buildAllWorkAvailableCondition(works map[string]*fleetv1beta1.Work, binding *fleetv1beta1.ClusterResourceBinding) metav1.Condition {
allAvailable := true
var notAvailableWork string
Expand Down
Loading
Loading