@@ -23,19 +23,19 @@ import (
23
23
"strings"
24
24
"time"
25
25
26
+ tfv1 "github.com/NexusGPU/tensor-fusion/api/v1"
26
27
cloudprovider "github.com/NexusGPU/tensor-fusion/internal/cloudprovider"
27
28
"github.com/NexusGPU/tensor-fusion/internal/cloudprovider/types"
28
- "github.com/NexusGPU/tensor-fusion/internal/utils"
29
-
30
- tfv1 "github.com/NexusGPU/tensor-fusion/api/v1"
31
29
"github.com/NexusGPU/tensor-fusion/internal/constants"
30
+ "github.com/NexusGPU/tensor-fusion/internal/utils"
32
31
batchv1 "k8s.io/api/batch/v1"
33
32
corev1 "k8s.io/api/core/v1"
34
33
"k8s.io/apimachinery/pkg/api/errors"
35
34
"k8s.io/apimachinery/pkg/api/resource"
36
35
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37
36
"k8s.io/apimachinery/pkg/runtime"
38
37
"k8s.io/client-go/tools/record"
38
+ "k8s.io/client-go/util/retry"
39
39
"k8s.io/utils/ptr"
40
40
ctrl "sigs.k8s.io/controller-runtime"
41
41
"sigs.k8s.io/controller-runtime/pkg/builder"
@@ -457,6 +457,7 @@ func (r *GPUNodeReconciler) reconcileCloudVendorNode(ctx context.Context, node *
457
457
return fmt .Errorf ("failed to unmarshal cloud vendor param: %w, GPUNode: %s" , err , node .Name )
458
458
}
459
459
460
+ // TODO: query cloud vendor by node name
460
461
status , err := provider .CreateNode (ctx , & nodeParam )
461
462
if err != nil {
462
463
return err
@@ -469,11 +470,31 @@ func (r *GPUNodeReconciler) reconcileCloudVendorNode(ctx context.Context, node *
469
470
if err != nil {
470
471
return err
471
472
}
473
+ gpuNode .Status .Phase = tfv1 .TensorFusionGPUNodePhasePending
472
474
gpuNode .Status .NodeInfo .IP = status .PrivateIP
473
475
gpuNode .Status .NodeInfo .InstanceID = status .InstanceID
474
476
gpuNode .Status .NodeInfo .Region = nodeParam .Region
475
- if err := r .Client .Status ().Update (ctx , gpuNode ); err != nil {
476
- log .FromContext (ctx ).Info ("Failed to update GPUNode status, must terminate node to keep operation atomic" , "name" , nodeParam .NodeName )
477
+
478
+ // Retry status update until success to handle version conflicts
479
+ err = retry .RetryOnConflict (retry .DefaultBackoff , func () error {
480
+ // Get the latest version before attempting an update
481
+ latest := & tfv1.GPUNode {}
482
+ if err := r .Client .Get (ctx , client.ObjectKey {Name : gpuNode .Name }, latest ); err != nil {
483
+ return err
484
+ }
485
+
486
+ // Apply our status updates to the latest version
487
+ latest .Status .Phase = tfv1 .TensorFusionGPUNodePhasePending
488
+ latest .Status .NodeInfo .IP = status .PrivateIP
489
+ latest .Status .NodeInfo .InstanceID = status .InstanceID
490
+ latest .Status .NodeInfo .Region = nodeParam .Region
491
+
492
+ // Attempt to update with the latest version
493
+ return r .Client .Status ().Update (ctx , latest )
494
+ })
495
+
496
+ if err != nil {
497
+ log .FromContext (ctx ).Error (err , "Failed to update GPUNode status after retries, must terminate node to keep operation atomic" , "name" , nodeParam .NodeName )
477
498
errTerminate := provider .TerminateNode (ctx , & types.NodeIdentityParam {
478
499
InstanceID : status .InstanceID ,
479
500
Region : nodeParam .Region ,
0 commit comments