Skip to content

Commit 785c9b7

Browse files
authored
fix: add destroying phase (#55)
* fix: add destroying phase * fix: unit test bug, nil exception check
1 parent e4916bb commit 785c9b7

5 files changed

+80
-30
lines changed

internal/controller/gpunode_controller.go

+21-8
Original file line numberDiff line numberDiff line change
@@ -73,46 +73,59 @@ func (r *GPUNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
7373
return ctrl.Result{}, err
7474
}
7575

76-
deleted, err := utils.HandleFinalizer(ctx, node, r.Client, func(ctx context.Context, node *tfv1.GPUNode) error {
76+
deleted, err := utils.HandleFinalizer(ctx, node, r.Client, func(ctx context.Context, node *tfv1.GPUNode) (bool, error) {
77+
78+
if node.Status.Phase != tfv1.TensorFusionGPUNodePhaseDestroying {
79+
node.Status.Phase = tfv1.TensorFusionGPUNodePhaseDestroying
80+
if err := r.Status().Update(ctx, node); err != nil {
81+
return false, err
82+
}
83+
}
84+
7785
if node.Spec.ManageMode == tfv1.GPUNodeManageModeAutoSelect {
7886
// Do nothing, but if it's managed by Karpenter, should come up with some way to tell Karpenter to terminate the GPU node
7987
} else if node.Spec.ManageMode == tfv1.GPUNodeManageModeProvisioned {
8088
clusterName := node.GetLabels()[constants.LabelKeyClusterOwner]
8189
cluster := &tfv1.TensorFusionCluster{}
8290
if err := r.Get(ctx, client.ObjectKey{Name: clusterName}, cluster); err != nil {
83-
return err
91+
92+
if errors.IsNotFound(err) {
93+
r.Recorder.Eventf(node, corev1.EventTypeWarning, "OrphanedNode", "provisioned node not found, this could result in orphaned nodes, please check manually: %s", node.Name)
94+
return true, nil
95+
}
96+
return false, err
8497
}
8598

8699
vendorCfg := cluster.Spec.ComputingVendor
87100
if vendorCfg == nil {
88-
return fmt.Errorf("failed to get computing vendor config for cluster %s", clusterName)
101+
return false, fmt.Errorf("failed to get computing vendor config for cluster %s", clusterName)
89102
}
90103

91104
provider, err := cloudprovider.GetProvider(*vendorCfg)
92105
if err != nil {
93-
return err
106+
return false, err
94107
}
95108

96109
if node.Status.NodeInfo.InstanceID == "" {
97110
r.Recorder.Eventf(node, corev1.EventTypeWarning, "OrphanedNode", "provisioned node without instanceID, this could result in orphaned nodes, please check manually: %s", node.Name)
98-
return nil
111+
return false, nil
99112
}
100113
err = (*provider).TerminateNode(ctx, &types.NodeIdentityParam{
101114
InstanceID: node.Status.NodeInfo.InstanceID,
102115
Region: node.Status.NodeInfo.Region,
103116
})
104117
if err != nil {
105-
return err
118+
return false, err
106119
}
107120

108121
}
109-
return nil
122+
return true, nil
110123
})
111124
if err != nil {
112125
return ctrl.Result{}, err
113126
}
114127
if deleted {
115-
return ctrl.Result{}, nil
128+
return ctrl.Result{RequeueAfter: constants.PendingRequeueDuration}, nil
116129
}
117130

118131
var poolName string

internal/controller/gpupool_controller.go

+31-6
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,25 @@ func (r *GPUPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
7575
}
7676

7777
// TODO: if phase is destroying, stop all existing workers and hypervisors, stop time series flow aggregations
78-
deleted, err := utils.HandleFinalizer(ctx, pool, r.Client, func(ctx context.Context, pool *tfv1.GPUPool) error {
79-
// TODO: stop all existing components
80-
return nil
78+
deleted, err := utils.HandleFinalizer(ctx, pool, r.Client, func(ctx context.Context, pool *tfv1.GPUPool) (bool, error) {
79+
if pool.Status.Phase != tfv1.TensorFusionPoolPhaseDestroying {
80+
pool.Status.Phase = tfv1.TensorFusionPoolPhaseDestroying
81+
if err := r.Status().Update(ctx, pool); err != nil {
82+
return false, err
83+
}
84+
}
85+
// check if all nodes has been deleted
86+
nodes := &tfv1.GPUNodeList{}
87+
if err := r.Client.List(ctx, nodes, client.MatchingLabels{constants.LabelKeyOwner: pool.Name}); err != nil {
88+
return false, err
89+
}
90+
return len(nodes.Items) == 0, nil
8191
})
8292
if err != nil {
8393
return ctrl.Result{}, err
8494
}
8595
if deleted {
86-
return ctrl.Result{}, nil
96+
return ctrl.Result{RequeueAfter: constants.PendingRequeueDuration}, nil
8797
}
8898

8999
if err := r.reconcilePoolCurrentCapacityAndReadiness(ctx, pool); err != nil {
@@ -163,12 +173,27 @@ func (r *GPUPoolReconciler) reconcilePoolCurrentCapacityAndReadiness(ctx context
163173
pool.Status.VirtualTFlops = virtualTFlops
164174
pool.Status.VirtualVRAM = virtualVRAM
165175

166-
if readyNodes == len(nodes.Items) && readyNodes != 0 {
176+
allowScaleToZero := true
177+
if pool.Spec.CapacityConfig != nil && pool.Spec.CapacityConfig.MinResources != nil {
178+
minTFlops, _ := pool.Spec.CapacityConfig.MinResources.TFlops.AsInt64()
179+
minVRAM, _ := pool.Spec.CapacityConfig.MinResources.VRAM.AsInt64()
180+
181+
allowScaleToZero = minTFlops == 0 && minVRAM == 0
182+
}
183+
184+
allNodesReady := readyNodes == len(nodes.Items)
185+
if allNodesReady && readyNodes == 0 {
186+
if !allowScaleToZero {
187+
allNodesReady = false
188+
}
189+
}
190+
191+
if allNodesReady {
167192
pool.Status.Phase = tfv1.TensorFusionPoolPhaseRunning
168193
log.Info("Pool is running, all nodes are ready", "name", pool.Name, "nodes", len(nodes.Items))
169194
} else {
170195
// set back to updating, wait GPUNode change triggering the pool change
171-
pool.Status.Phase = tfv1.TensorFusionPoolPhasePending
196+
pool.Status.Phase = tfv1.TensorFusionPoolPhaseUpdating
172197
}
173198

174199
if err := r.Client.Status().Update(ctx, pool); err != nil {

internal/controller/tensorfusioncluster_controller.go

+13-3
Original file line numberDiff line numberDiff line change
@@ -91,15 +91,25 @@ func (r *TensorFusionClusterReconciler) Reconcile(ctx context.Context, req ctrl.
9191
}
9292
originalStatus := tfc.Status.DeepCopy()
9393

94-
deleted, err := utils.HandleFinalizer(ctx, tfc, r.Client, func(context context.Context, tfc *tfv1.TensorFusionCluster) error {
94+
deleted, err := utils.HandleFinalizer(ctx, tfc, r.Client, func(context context.Context, tfc *tfv1.TensorFusionCluster) (bool, error) {
9595
log.Info("TensorFusionCluster is being deleted", "name", tfc.Name)
96-
return nil
96+
if tfc.Status.Phase != tfv1.TensorFusionClusterDestroying {
97+
tfc.Status.Phase = tfv1.TensorFusionClusterDestroying
98+
if err := r.Status().Update(ctx, tfc); err != nil {
99+
return false, err
100+
}
101+
}
102+
var poolList tfv1.GPUPoolList
103+
if err := r.List(ctx, &poolList, client.MatchingLabels{constants.LabelKeyOwner: tfc.Name}); err != nil {
104+
return false, err
105+
}
106+
return len(poolList.Items) == 0, nil
97107
})
98108
if err != nil {
99109
return ctrl.Result{}, err
100110
}
101111
if deleted {
102-
return ctrl.Result{}, nil
112+
return ctrl.Result{RequeueAfter: constants.PendingRequeueDuration}, nil
103113
}
104114

105115
if tfc.Status.Phase == "" || tfc.Status.Phase == constants.PhaseUnknown {

internal/controller/tensorfusionconnection_controller.go

+7-8
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,7 @@ func (r *TensorFusionConnectionReconciler) Reconcile(ctx context.Context, req ct
7373
return ctrl.Result{}, err
7474
}
7575
if deleted {
76-
// Object is being deleted, no need to proceed scheduling and other actions
77-
return ctrl.Result{}, nil
76+
return ctrl.Result{RequeueAfter: constants.PendingRequeueDuration}, nil
7877
}
7978

8079
var gpu *tfv1.GPU
@@ -171,27 +170,27 @@ func (r *TensorFusionConnectionReconciler) tryStartWorker(
171170
}
172171

173172
// handleDeletion handles cleanup of external dependencies
174-
func (r *TensorFusionConnectionReconciler) handleDeletion(ctx context.Context, connection *tfv1.TensorFusionConnection) error {
173+
func (r *TensorFusionConnectionReconciler) handleDeletion(ctx context.Context, connection *tfv1.TensorFusionConnection) (bool, error) {
175174
if connection.Status.GPU == "" {
176-
return nil // No gpu was allocated, nothing to clean up
175+
return true, nil // No gpu was allocated, nothing to clean up
177176
}
178177

179178
// Get the gpu
180179
gpu := &tfv1.GPU{}
181180
if err := r.Get(ctx, client.ObjectKey{Name: connection.Status.GPU}, gpu); err != nil {
182181
if errors.IsNotFound(err) {
183182
// gpu is already gone, nothing to do
184-
return nil
183+
return true, nil
185184
}
186-
return err
185+
return false, err
187186
}
188187

189188
// Release the resources
190189
if err := r.Scheduler.Release(connection.Spec.Resources.Requests, gpu); err != nil {
191-
return err
190+
return false, err
192191
}
193192

194-
return r.mustUpdateTFConnectionStatus(ctx, connection, gpu)
193+
return true, r.mustUpdateTFConnectionStatus(ctx, connection, gpu)
195194
}
196195

197196
func (r *TensorFusionConnectionReconciler) mustUpdateTFConnectionStatus(ctx context.Context, connection *tfv1.TensorFusionConnection, gpu *tfv1.GPU) error {

internal/utils/reconcile.go

+8-5
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,24 @@ var ErrNextLoop = errors.New("stop this loop and return the associated Result ob
2727
// ErrTerminateLoop is not a real error. It forces the current reconciliation loop to stop
2828
var ErrTerminateLoop = errors.New("stop this loop and do not requeue")
2929

30-
func HandleFinalizer[T client.Object](ctx context.Context, obj T, r client.Client, deleteHook func(context.Context, T) error) (bool, error) {
30+
func HandleFinalizer[T client.Object](ctx context.Context, obj T, r client.Client, deleteHook func(context.Context, T) (bool, error)) (bool, error) {
3131
// Check if object is being deleted
3232
deleted := !obj.GetDeletionTimestamp().IsZero()
3333
if deleted {
3434
// Object is being deleted - process finalizer
3535
if controllerutil.ContainsFinalizer(obj, constants.Finalizer) {
3636
// Run custom deletion hook
37-
if err := deleteHook(ctx, obj); err != nil {
37+
canBeDeleted, err := deleteHook(ctx, obj)
38+
if err != nil {
3839
return false, err
3940
}
4041

4142
// Remove finalizer once cleanup is done
42-
controllerutil.RemoveFinalizer(obj, constants.Finalizer)
43-
if err := r.Update(ctx, obj); err != nil {
44-
return false, err
43+
if canBeDeleted {
44+
controllerutil.RemoveFinalizer(obj, constants.Finalizer)
45+
if err := r.Update(ctx, obj); err != nil {
46+
return false, err
47+
}
4548
}
4649
}
4750
} else {

0 commit comments

Comments
 (0)