Skip to content

Commit d9de057

Browse files
authored
fix: subscribe the pool's nodeselector in the nodecontroller (#49)
1 parent c1424d9 commit d9de057

File tree

7 files changed

+45
-16
lines changed

7 files changed

+45
-16
lines changed

cmd/insights/main.go

-5
This file was deleted.

internal/cloudprovider/alibaba/ecs.go

-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,6 @@ func (p AlibabaGPUNodeProvider) CreateNode(ctx context.Context, param *types.Nod
119119
return nil, fmt.Errorf("instance creation failed: %s", response.String())
120120
}
121121

122-
// TODO, SHOULD use request id to check instance status util created
123122
return &types.GPUNodeStatus{
124123
InstanceID: response.RequestId,
125124
CreatedAt: time.Now(),

internal/controller/gpu_controller.go

-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ type GPUReconciler struct {
4343
// Reconcile is part of the main kubernetes reconciliation loop which aims to
4444
// move the current state of the cluster closer to the desired state.
4545
func (r *GPUReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
46-
// TODO: Calculate tflops and update capacity here
4746
return ctrl.Result{}, nil
4847
}
4948

internal/controller/gpupool_compaction_controller.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,10 @@ func (r *GPUPoolCompactionReconciler) checkNodeCompaction(ctx context.Context, p
7070
// Not is in-using, should not be terminated
7171
continue
7272
}
73-
73+
if gpuNode.Status.Phase != tfv1.TensorFusionGPUNodePhaseRunning {
74+
// Not running, should not be terminated
75+
continue
76+
}
7477
// Protect new nodes at least 5 minutes to avoid flapping
7578
if gpuNode.CreationTimestamp.Time.After(time.Now().Add(newNodeProtectionDuration)) {
7679
continue

internal/controller/gpupool_controller.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,9 @@ func (r *GPUPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
6565
return ctrl.Result{}, err
6666
}
6767

68+
// TODO: if phase is destroying, stop all existing workers and hypervisors, stop time series flow aggregations
6869
deleted, err := utils.HandleFinalizer(ctx, pool, r.Client, func(ctx context.Context, pool *tfv1.GPUPool) error {
69-
// TODO: stop all existing workers and hypervisors, stop time series flow aggregations
70+
// TODO: stop all existing components
7071
return nil
7172
})
7273
if err != nil {
@@ -101,9 +102,6 @@ func (r *GPUPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
101102
}
102103
}
103104

104-
// if !isProvisioningMode {
105-
// TODO: move GPUNode CR creation here, rather than node_controller
106-
// }
107105
// TODO, when componentConfig changed, it should notify corresponding resource to upgrade
108106
// eg. when hypervisor changed, should change all owned GPUNode's status.phase to Updating
109107

@@ -172,5 +170,6 @@ func (r *GPUPoolReconciler) SetupWithManager(mgr ctrl.Manager) error {
172170
return ctrl.NewControllerManagedBy(mgr).
173171
For(&tfv1.GPUPool{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
174172
Named("gpupool").
173+
Owns(&tfv1.GPUNode{}).
175174
Complete(r)
176175
}

internal/controller/gpupool_node_provision.go

-2
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ import (
2020
)
2121

2222
// Controller and trigger logic for abstract layer of node provisioning
23-
// TODO: implement the logic
24-
2523
func (r *GPUPoolReconciler) reconcilePoolCapacityWithProvisioner(ctx context.Context, pool *tfv1.GPUPool) (bool, error) {
2624
log := log.FromContext(ctx)
2725
// check if min resource constraint is satisfied

internal/controller/node_controller.go

+38-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
tfv1 "github.com/NexusGPU/tensor-fusion-operator/api/v1"
2626
"github.com/NexusGPU/tensor-fusion-operator/internal/constants"
27+
"github.com/NexusGPU/tensor-fusion-operator/internal/utils"
2728
corev1 "k8s.io/api/core/v1"
2829
"k8s.io/apimachinery/pkg/api/errors"
2930
"k8s.io/apimachinery/pkg/api/resource"
@@ -33,8 +34,11 @@ import (
3334
"sigs.k8s.io/controller-runtime/pkg/builder"
3435
"sigs.k8s.io/controller-runtime/pkg/client"
3536
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
37+
"sigs.k8s.io/controller-runtime/pkg/event"
38+
"sigs.k8s.io/controller-runtime/pkg/handler"
3639
"sigs.k8s.io/controller-runtime/pkg/log"
3740
"sigs.k8s.io/controller-runtime/pkg/predicate"
41+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3842

3943
schedulingcorev1 "k8s.io/component-helpers/scheduling/corev1"
4044
)
@@ -95,7 +99,14 @@ func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
9599
return ctrl.Result{}, err
96100
}
97101
if !matched {
98-
log.Info("No matched GPU pool found, skip reconcile the Node", "node", node.Name, "labels", node.Labels)
102+
// delete gpunode if no matched pool
103+
if err := r.Client.Delete(ctx, &tfv1.GPUNode{
104+
ObjectMeta: metav1.ObjectMeta{
105+
Name: node.Name,
106+
},
107+
}); err != nil {
108+
return ctrl.Result{}, fmt.Errorf("can not delete gpuNode(%s) : %w", node.Name, err)
109+
}
99110
return ctrl.Result{}, nil
100111
}
101112

@@ -169,11 +180,36 @@ func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
169180
if err != nil {
170181
return fmt.Errorf("unable to create predicate: %w", err)
171182
}
183+
172184
return ctrl.NewControllerManagedBy(mgr).
173185
For(&corev1.Node{}, builder.WithPredicates(p)).
174186
Named("node").
187+
Watches(&tfv1.GPUPool{}, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request {
188+
nodelist := &tfv1.GPUNodeList{}
189+
if err := mgr.GetClient().List(ctx, nodelist, client.MatchingLabels{
190+
selectors[0]: selectors[1],
191+
}); err != nil {
192+
log.FromContext(ctx).Error(err, "failed to list GPUNode")
193+
return []reconcile.Request{}
194+
}
195+
var requests []reconcile.Request
196+
for _, n := range nodelist.Items {
197+
requests = append(requests, reconcile.Request{NamespacedName: client.ObjectKey{Name: n.Name}})
198+
}
199+
return requests
200+
}), builder.WithPredicates(predicate.Funcs{
201+
UpdateFunc: func(e event.UpdateEvent) bool {
202+
oldObj, ok1 := e.ObjectOld.(*tfv1.GPUPool)
203+
newObj, ok2 := e.ObjectNew.(*tfv1.GPUPool)
204+
if !ok1 || !ok2 {
205+
return false
206+
}
207+
oldNodeSelector := oldObj.Spec.NodeManagerConfig.NodeSelector
208+
newNodeSelector := newObj.Spec.NodeManagerConfig.NodeSelector
209+
return utils.GetObjectHash(oldNodeSelector) != utils.GetObjectHash(newNodeSelector)
210+
},
211+
})).
175212
Complete(r)
176-
// TODO: When Pool changed, all nodes should re-generated, delete not matched ones, this logic should be added into GPUPool controller
177213
}
178214

179215
func getMatchedPoolName(node *corev1.Node, poolList []tfv1.GPUPool) (*tfv1.GPUPool, bool, error) {

0 commit comments

Comments
 (0)