Skip to content

Commit

Permalink
Added more scheduler logic
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelawyu committed Jul 3, 2023
1 parent 3c718d4 commit 5384bda
Show file tree
Hide file tree
Showing 7 changed files with 413 additions and 140 deletions.
8 changes: 1 addition & 7 deletions apis/placement/v1beta1/binding_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
// SchedulerCleanupFinalizer is a finalizer added by the scheduler to all bindings, to make sure
// that the scheduler can react to binding deletions if necessary.
SchedulerCleanupFinalizer = fleetPrefix + "scheduler-cleanup"
)

// +kubebuilder:object:root=true
// +kubebuilder:resource:scope=Cluster,categories={fleet},shortName=rb
// +kubebuilder:subresource:status
Expand Down Expand Up @@ -48,7 +42,7 @@ type ResourceBindingSpec struct {
// it points to the name of the leading snapshot of the index group.
ResourceSnapshotName string `json:"resourceSnapshotName"`

// PolicySnapshtName is the name of the scheduling policy snapshot that this resource binding
// PolicySnapshotName is the name of the scheduling policy snapshot that this resource binding
// points to; more specifically, the scheduler creates this bindings in accordance with this
// scheduling policy snapshot.
PolicySnapshotName string `json:"policySnapshotName"`
Expand Down
4 changes: 4 additions & 0 deletions apis/placement/v1beta1/clusterresourceplacement_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
)

const (
SchedulerCRPCleanupFinalizer = fleetPrefix + "scheduler-crp-cleanup"
)

// +genclient
// +genclient:nonNamespaced
// +kubebuilder:object:root=true
Expand Down
79 changes: 33 additions & 46 deletions pkg/scheduler/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
"go.goms.io/fleet/pkg/scheduler/framework/parallelizer"
Expand Down Expand Up @@ -239,20 +238,11 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p
// * dangling bindings, i.e., bindings that are associated with a cluster that is no longer
// in a normally operating state (the cluster has left the fleet, or is in the state of leaving),
// yet has not been marked as deleting by the scheduler; and
// * deleted bindings, i.e, bindings that are marked for deletion in the API server, but have not
// yet been marked as deleting by the scheduler.
//
// Note that bindings marked as deleting are ignored by the scheduler (unless they are already marked
// for deletion and still has the scheduler cleanup finalizer), as they
// are irrelevant to the scheduling cycle.
active, creating, obsolete, dangling, deleted := classifyBindings(policy, bindings, clusters)

// If a binding has been marked for deletion yet still has the scheduler cleanup finalizer,
// remove the finalizer from it to clear it for GC.
if err := f.removeSchedulerCleanupFinalizerFrom(ctx, deleted); err != nil {
klog.ErrorS(err, "failed to remove scheduler cleanup finalizer from deleted bindings", "schedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}
active, creating, obsolete, dangling := classifyBindings(policy, bindings, clusters)

// Mark all dangling bindings as deleting.
if err := f.markAsDeletingFor(ctx, dangling); err != nil {
Expand All @@ -269,10 +259,13 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p

switch policy.Spec.Policy.PlacementType {
case fleetv1beta1.PickAllPlacementType:
return f.runSchedulingCycleForPickAllPlacementType(ctx, state, crpName, policy, policyRef, clusters, active, creating, obsolete)
// Run the scheduling cycle for policy of the PickAll placement type.
return f.runSchedulingCycleForPickAllPlacementType(ctx, state, crpName, policy, clusters, active, creating, obsolete)
case fleetv1beta1.PickNPlacementType:
return f.runSchedulingCycleForPickNPlacementType(ctx, state, crpName, policy, policyRef, clusters, active, creating, obsolete)
// Run the scheduling cycle for policy of the PickN placement type.
return f.runSchedulingCycleForPickNPlacementType(ctx, state, crpName, policy, clusters, active, creating, obsolete)
default:
// This normally should never occur.
klog.ErrorS(err, "unknown placement type", "schedulingPolicySnapshot", policyRef)
return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err)
}
Expand All @@ -283,10 +276,11 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p
func (f *framework) updatePolicySnapshotStatusFrom(
ctx context.Context,
policy *fleetv1beta1.ClusterPolicySnapshot,
policyRef klog.ObjectRef,
filtered []*filteredClusterWithStatus,
existing ...[]*fleetv1beta1.ClusterResourceBinding,
) error {
policyRef := klog.KObj(policy)

// Prepare new scheduling decisions.
newDecisions := newSchedulingDecisionsFrom(f.maxClusterDecisionCount, filtered, existing...)
// Prepare new scheduling condition.
Expand All @@ -309,7 +303,9 @@ func (f *framework) updatePolicySnapshotStatusFrom(
}

// manipulateBindings creates, updates, and deletes bindings.
func (f *framework) manipulateBindings(ctx context.Context, policyRef klog.ObjectRef, toCreate, toUpdate, toDelete []*fleetv1beta1.ClusterResourceBinding) error {
func (f *framework) manipulateBindings(ctx context.Context, policy *fleetv1beta1.ClusterPolicySnapshot, toCreate, toUpdate, toDelete []*fleetv1beta1.ClusterResourceBinding) error {
policyRef := klog.KObj(policy)

// Create new bindings.
if err := f.createBindings(ctx, toCreate); err != nil {
klog.ErrorS(err, "failed to create new bindings", "schedulingPolicySnapshot", policyRef)
Expand All @@ -318,8 +314,9 @@ func (f *framework) manipulateBindings(ctx context.Context, policyRef klog.Objec

// Update existing bindings.
//
// Note that a race condition may arise here, when a rollout controller attempts to update bindings
// at the same time with the scheduler. An error induced requeue will happen in this case.
// A race condition may arise here, when a rollout controller attempts to update bindings
// at the same time with the scheduler, e.g., marking a binding as active (from the creating
// state)
if err := f.updateBindings(ctx, toUpdate); err != nil {
klog.ErrorS(err, "failed to update old bindings", "schedulingPolicySnapshot", policyRef)
return err
Expand Down Expand Up @@ -350,9 +347,10 @@ func (f *framework) runAllPluginsForPickAllPlacementType(
ctx context.Context,
state *CycleState,
policy *fleetv1beta1.ClusterPolicySnapshot,
policyRef klog.ObjectRef,
clusters []fleetv1beta1.MemberCluster,
) (scored ScoredClusters, filtered []*filteredClusterWithStatus, err error) {
policyRef := klog.KObj(policy)

// Run pre-filter plugins.
//
// Each plugin can:
Expand Down Expand Up @@ -397,16 +395,17 @@ func (f *framework) runSchedulingCycleForPickAllPlacementType(
state *CycleState,
crpName string,
policy *fleetv1beta1.ClusterPolicySnapshot,
policyRef klog.ObjectRef,
clusters []fleetv1beta1.MemberCluster,
active, creating, obsolete []*fleetv1beta1.ClusterResourceBinding,
) (result ctrl.Result, err error) {
policyRef := klog.KObj(policy)

// The scheduler always needs to take action when processing scheduling policies of the PickAll
// placement type; enter the actual scheduling stages.
klog.V(2).InfoS("Scheduling is needed; entering scheduling stages", "schedulingPolicySnapshot", policyRef)

// Run all plugins needed.
scored, filtered, err := f.runAllPluginsForPickAllPlacementType(ctx, state, policy, policyRef, clusters)
scored, filtered, err := f.runAllPluginsForPickAllPlacementType(ctx, state, policy, clusters)
if err != nil {
klog.ErrorS(err, "failed to run all plugins (pickAll placement type)", "schedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
Expand Down Expand Up @@ -437,14 +436,14 @@ func (f *framework) runSchedulingCycleForPickAllPlacementType(

// Manipulate bindings accordingly.
klog.V(2).InfoS("Creating, updating, and/or deleting bindings", "policy", policyRef)
if err := f.manipulateBindings(ctx, policyRef, toCreate, toUpdate, toDelete); err != nil {
if err := f.manipulateBindings(ctx, policy, toCreate, toUpdate, toDelete); err != nil {
klog.ErrorS(err, "failed to manipulate bindings", "schedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}

// Update policy snapshot status with the latest scheduling decisions and condition.
klog.V(2).InfoS("Updating policy snapshot status", "schedulingPolicySnapshot", policyRef)
if err := f.updatePolicySnapshotStatusFrom(ctx, policy, policyRef, filtered, toCreate, toUpdate, active, creating); err != nil {
if err := f.updatePolicySnapshotStatusFrom(ctx, policy, filtered, toCreate, toUpdate, active, creating); err != nil {
klog.ErrorS(err, "failed to update latest scheduling decisions and condition", "schedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}
Expand All @@ -460,11 +459,12 @@ func (f *framework) runAllPluginsForPickNPlacementType(
ctx context.Context,
state *CycleState,
policy *fleetv1beta1.ClusterPolicySnapshot,
policyRef klog.ObjectRef,
numOfClusters int,
activeOrCreatingBindings int,
clusters []fleetv1beta1.MemberCluster,
) (scored ScoredClusters, filtered []*filteredClusterWithStatus, err error) {
policyRef := klog.KObj(policy)

// Calculate the batch size.
//
// Note that obsolete bindings are not counted.
Expand Down Expand Up @@ -554,10 +554,11 @@ func (f *framework) runSchedulingCycleForPickNPlacementType(
state *CycleState,
crpName string,
policy *fleetv1beta1.ClusterPolicySnapshot,
policyRef klog.ObjectRef,
clusters []fleetv1beta1.MemberCluster,
active, creating, obsolete []*fleetv1beta1.ClusterResourceBinding,
) (result ctrl.Result, err error) {
policyRef := klog.KObj(policy)

// Retrieve the desired number of clusters from the policy.
//
// Note that for scheduling policies of the PickN type, this annotation is expected to be present.
Expand Down Expand Up @@ -608,7 +609,7 @@ func (f *framework) runSchedulingCycleForPickNPlacementType(
// Note that since there is no reliable way to determine the validity of old decisions added
// to the policy snapshot status, we will only update the status with the known facts, i.e.,
// the clusters that are currently selected.
if err := f.updatePolicySnapshotStatusFrom(ctx, policy, policyRef, nil, active, creating); err != nil {
if err := f.updatePolicySnapshotStatusFrom(ctx, policy, nil, active, creating); err != nil {
klog.ErrorS(err, "failed to update latest scheduling decisions and condition when downscaling", "schedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}
Expand All @@ -629,7 +630,7 @@ func (f *framework) runSchedulingCycleForPickNPlacementType(
// Note that since there is no reliable way to determine the validity of old decisions added
// to the policy snapshot status, we will only update the status with the known facts, i.e.,
// the clusters that are currently selected.
if err := f.updatePolicySnapshotStatusFrom(ctx, policy, policyRef, nil, active, creating); err != nil {
if err := f.updatePolicySnapshotStatusFrom(ctx, policy, nil, active, creating); err != nil {
klog.ErrorS(err, "failed to update latest scheduling decisions and condition when no scheduling run is needed", "schedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}
Expand All @@ -642,7 +643,7 @@ func (f *framework) runSchedulingCycleForPickNPlacementType(
klog.V(2).InfoS("Scheduling is needed; entering scheduling stages", "schedulingPolicySnapshot", policyRef)

// Run all the plugins.
scored, filtered, err := f.runAllPluginsForPickNPlacementType(ctx, state, policy, policyRef, numOfClusters, len(active)+len(creating), clusters)
scored, filtered, err := f.runAllPluginsForPickNPlacementType(ctx, state, policy, numOfClusters, len(active)+len(creating), clusters)
if err != nil {
klog.ErrorS(err, "failed to run all plugins", "schedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
Expand Down Expand Up @@ -688,14 +689,14 @@ func (f *framework) runSchedulingCycleForPickNPlacementType(

// Manipulate bindings accordingly.
klog.V(2).InfoS("Creating, updating, and/or deleting bindings", "policy", policyRef)
if err := f.manipulateBindings(ctx, policyRef, toCreate, toUpdate, toDelete); err != nil {
if err := f.manipulateBindings(ctx, policy, toCreate, toUpdate, toDelete); err != nil {
klog.ErrorS(err, "failed to manipulate bindings", "schedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}

// Update policy snapshot status with the latest scheduling decisions and condition.
klog.V(2).InfoS("Updating policy snapshot status", "schedulingPolicySnapshot", policyRef)
if err := f.updatePolicySnapshotStatusFrom(ctx, policy, policyRef, filtered, toCreate, toUpdate, active, creating); err != nil {
if err := f.updatePolicySnapshotStatusFrom(ctx, policy, filtered, toCreate, toUpdate, active, creating); err != nil {
klog.ErrorS(err, "failed to update latest scheduling decisions and condition", "schedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -744,28 +745,14 @@ func (f *framework) collectBindings(ctx context.Context, crpName string) ([]flee
return bindingList.Items, nil
}

// removeSchedulerCleanupFinalizerFrom removes the scheduler cleanup finalizer from a list of bindings.
func (f *framework) removeSchedulerCleanupFinalizerFrom(ctx context.Context, bindings []*fleetv1beta1.ClusterResourceBinding) error {
errorFormat := "failed to remove scheduler finalizer from binding %s: %w"

for _, binding := range bindings {
controllerutil.RemoveFinalizer(binding, fleetv1beta1.SchedulerCleanupFinalizer)
if err := f.client.Update(ctx, binding, &client.UpdateOptions{}); err != nil {
return controller.NewAPIServerError(fmt.Errorf(errorFormat, binding.Name, err))
}
}
return nil
}

// markAsDeletingFor marks a list of bindings as deleting.
func (f *framework) markAsDeletingFor(ctx context.Context, bindings []*fleetv1beta1.ClusterResourceBinding) error {
errorFormat := "failed to mark binding %s as deleting: %w"

for _, binding := range bindings {
// Deleting is a terminal state for a binding; since there is no point of return, and the
// scheduler has acknowledged its fate at this moment, it is safe for the scheduler to remove
// the scheduler cleanup finalizer from any binding that is of the deleting state.
controllerutil.RemoveFinalizer(binding, fleetv1beta1.SchedulerCleanupFinalizer)
// Note that Deleting is a terminal state for a binding; since there is no point of return, and the
// scheduler has acknowledged its fate at this moment, any binding marked as deletion will be
// disregarded by the scheduler from this point onwards.
binding.Spec.State = fleetv1beta1.BindingStateDeleting
if err := f.client.Update(ctx, binding, &client.UpdateOptions{}); err != nil {
return controller.NewAPIServerError(fmt.Errorf(errorFormat, binding.Name, err))
Expand Down
Loading

0 comments on commit 5384bda

Please sign in to comment.