Skip to content

Commit 9e790b9

Browse files
authored
fix: requeue if schedule gpu fail (#79)
1 parent 45ae865 commit 9e790b9

File tree

2 files changed

+12
-6
lines changed

2 files changed

+12
-6
lines changed

internal/controller/tensorfusionconnection_controller.go

+2
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ func (r *TensorFusionConnectionReconciler) Reconcile(ctx context.Context, req ct
8686
r.Recorder.Eventf(connection, corev1.EventTypeWarning, "WorkerSelectionFailed", "Failed to select worker: %v", err)
8787
// Update the status to WorkerPending when worker selection fails
8888
connection.Status.Phase = tfv1.WorkerPending
89+
connection.Status.WorkerName = ""
90+
connection.Status.ConnectionURL = ""
8991
if updateErr := r.Status().Update(ctx, connection); updateErr != nil {
9092
return ctrl.Result{}, fmt.Errorf("failed to select worker: %w, failed to update status: %v", err, updateErr)
9193
}

internal/controller/tensorfusionworkload_controller.go

+10-6
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,12 @@ func (r *TensorFusionWorkloadReconciler) Reconcile(ctx context.Context, req ctrl
164164

165165
// Calculate how many pods need to be added
166166
podsToAdd := int(desiredReplicas - currentReplicas)
167-
if err := r.scaleUpWorkers(ctx, workerGenerator, workload, podsToAdd); err != nil {
168-
return ctrl.Result{}, err
167+
result, err := r.scaleUpWorkers(ctx, workerGenerator, workload, podsToAdd)
168+
if err != nil {
169+
return ctrl.Result{}, fmt.Errorf("scale up workers: %w", err)
170+
}
171+
if !result.IsZero() {
172+
return result, nil
169173
}
170174
} else if currentReplicas > desiredReplicas {
171175
log.Info("Scaling down workers", "from", currentReplicas, "to", desiredReplicas)
@@ -306,7 +310,7 @@ func (r *TensorFusionWorkloadReconciler) deletePod(ctx context.Context, pod *cor
306310
}
307311

308312
// scaleUpWorkers handles the scaling up of worker pods
309-
func (r *TensorFusionWorkloadReconciler) scaleUpWorkers(ctx context.Context, workerGenerator *worker.WorkerGenerator, workload *tfv1.TensorFusionWorkload, count int) error {
313+
func (r *TensorFusionWorkloadReconciler) scaleUpWorkers(ctx context.Context, workerGenerator *worker.WorkerGenerator, workload *tfv1.TensorFusionWorkload, count int) (ctrl.Result, error) {
310314
log := log.FromContext(ctx)
311315

312316
// Create worker pods
@@ -315,7 +319,7 @@ func (r *TensorFusionWorkloadReconciler) scaleUpWorkers(ctx context.Context, wor
315319
gpu, err := r.Scheduler.Schedule(ctx, workload.Spec.PoolName, workload.Spec.Resources.Requests)
316320
if err != nil {
317321
r.Recorder.Eventf(workload, corev1.EventTypeWarning, "ScheduleGPUFailed", "Failed to schedule GPU: %v", err)
318-
return fmt.Errorf("schedule GPU: %w", err)
322+
return ctrl.Result{RequeueAfter: constants.PendingRequeueDuration}, nil
319323
}
320324

321325
pod, err := r.tryStartWorker(ctx, workerGenerator, gpu, workload)
@@ -325,7 +329,7 @@ func (r *TensorFusionWorkloadReconciler) scaleUpWorkers(ctx context.Context, wor
325329
if releaseErr != nil {
326330
log.Error(releaseErr, "Failed to release GPU after pod creation failure")
327331
}
328-
return fmt.Errorf("create worker pod: %w", err)
332+
return ctrl.Result{}, fmt.Errorf("create worker pod: %w", err)
329333
}
330334

331335
labels := prometheus.Labels{
@@ -339,7 +343,7 @@ func (r *TensorFusionWorkloadReconciler) scaleUpWorkers(ctx context.Context, wor
339343
metrics.VramBytesLimit.With(labels).Set(workload.Spec.Resources.Limits.Vram.AsApproximateFloat64())
340344
}
341345

342-
return nil
346+
return ctrl.Result{}, nil
343347
}
344348

345349
// updateStatus updates the WorkerStatuses and readyReplicas field in the workload status

0 commit comments

Comments
 (0)