Skip to content

Commit fdbfac8

Browse files
committed
[feat][plugin] support creating RayCluster with config file
as described in the [Ray Kubectl Plugin 1.4.0 Wishlist][1]. closes #3142 Signed-off-by: David Xia <[email protected]> [1]: https://docs.google.com/document/d/18V5_7SGUzS0LGlaJB7xus04omyr3drKkK4vV6Kz14jY/edit?tab=t.0#heading=h.tuqyq1b6b2x7
1 parent 87c5541 commit fdbfac8

File tree

6 files changed

+1470
-253
lines changed

6 files changed

+1470
-253
lines changed

kubectl-plugin/pkg/cmd/create/create_cluster.go

+179-68
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,20 @@ import (
55
"fmt"
66
"time"
77

8+
"k8s.io/utils/ptr"
9+
810
"github.com/ray-project/kuberay/kubectl-plugin/pkg/util"
911
"github.com/ray-project/kuberay/kubectl-plugin/pkg/util/generation"
1012
"github.com/spf13/cobra"
13+
"github.com/spf13/pflag"
1114
"k8s.io/cli-runtime/pkg/genericclioptions"
12-
"k8s.io/utils/ptr"
1315

1416
"github.com/ray-project/kuberay/kubectl-plugin/pkg/util/client"
1517
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1618
cmdutil "k8s.io/kubectl/pkg/cmd/util"
1719
"k8s.io/kubectl/pkg/util/templates"
1820

21+
rayv1ac "github.com/ray-project/kuberay/ray-operator/pkg/client/applyconfiguration/ray/v1"
1922
rayclient "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned"
2023
)
2124

@@ -28,6 +31,7 @@ type CreateClusterOptions struct {
2831
headRayStartParams map[string]string
2932
headNodeSelectors map[string]string
3033
workerNodeSelectors map[string]string
34+
rayClusterConfig *generation.RayClusterConfig
3135
namespace string
3236
clusterName string
3337
rayVersion string
@@ -40,6 +44,7 @@ type CreateClusterOptions struct {
4044
workerMemory string
4145
workerEphemeralStorage string
4246
workerGPU string
47+
configFile string
4348
workerReplicas int32
4449
dryRun bool
4550
wait bool
@@ -58,6 +63,9 @@ var (
5863
5964
# Create a Ray cluster with K8s labels and annotations
6065
kubectl ray create cluster sample-cluster --labels app=ray,env=dev --annotations ttl-hours=24,owner=chthulu
66+
67+
# Create a Ray cluster from a YAML configuration file
68+
kubectl ray create cluster sample-cluster --file ray-cluster-config.yaml
6169
`, util.RayVersion, util.RayImage))
6270
)
6371

@@ -80,7 +88,7 @@ func NewCreateClusterCommand(cmdFactory cmdutil.Factory, streams genericclioptio
8088
if err := options.Complete(cmd, args); err != nil {
8189
return err
8290
}
83-
if err := options.Validate(); err != nil {
91+
if err := options.Validate(cmd); err != nil {
8492
return err
8593
}
8694

@@ -93,26 +101,27 @@ func NewCreateClusterCommand(cmdFactory cmdutil.Factory, streams genericclioptio
93101
},
94102
}
95103

104+
cmd.Flags().StringToStringVar(&options.labels, "labels", nil, "K8s labels (e.g. --labels app=ray,env=dev)")
105+
cmd.Flags().StringToStringVar(&options.annotations, "annotations", nil, "K8s annotations (e.g. --annotations ttl-hours=24,owner=chthulu)")
96106
cmd.Flags().StringVar(&options.rayVersion, "ray-version", util.RayVersion, "Ray version to use")
97107
cmd.Flags().StringVar(&options.image, "image", fmt.Sprintf("rayproject/ray:%s", options.rayVersion), "container image to use")
98-
cmd.Flags().StringVar(&options.headCPU, "head-cpu", "2", "number of CPUs in the Ray head")
99-
cmd.Flags().StringVar(&options.headMemory, "head-memory", "4Gi", "amount of memory in the Ray head")
100-
cmd.Flags().StringVar(&options.headGPU, "head-gpu", "0", "number of GPUs in the Ray head")
101-
cmd.Flags().StringVar(&options.headEphemeralStorage, "head-ephemeral-storage", "", "amount of ephemeral storage in the Ray head")
108+
cmd.Flags().StringVar(&options.headCPU, "head-cpu", util.DefaultHeadCPU, "number of CPUs in the Ray head")
109+
cmd.Flags().StringVar(&options.headMemory, "head-memory", util.DefaultHeadMemory, "amount of memory in the Ray head")
110+
cmd.Flags().StringVar(&options.headGPU, "head-gpu", util.DefaultHeadGPU, "number of GPUs in the Ray head")
111+
cmd.Flags().StringVar(&options.headEphemeralStorage, "head-ephemeral-storage", util.DefaultHeadEphemeralStorage, "amount of ephemeral storage in the Ray head")
102112
cmd.Flags().StringToStringVar(&options.headRayStartParams, "head-ray-start-params", options.headRayStartParams, "a map of arguments to the Ray head's 'ray start' entrypoint, e.g. '--head-ray-start-params dashboard-host=0.0.0.0,num-cpus=2'")
103-
cmd.Flags().Int32Var(&options.workerReplicas, "worker-replicas", 1, "desired worker group replicas")
104-
cmd.Flags().StringVar(&options.workerCPU, "worker-cpu", "2", "number of CPUs in each worker group replica")
105-
cmd.Flags().StringVar(&options.workerMemory, "worker-memory", "4Gi", "amount of memory in each worker group replica")
106-
cmd.Flags().StringVar(&options.workerGPU, "worker-gpu", "0", "number of GPUs in each worker group replica")
107-
cmd.Flags().StringVar(&options.workerEphemeralStorage, "worker-ephemeral-storage", "", "amount of ephemeral storage in each worker group replica")
113+
cmd.Flags().StringToStringVar(&options.headNodeSelectors, "head-node-selectors", nil, "Node selectors to apply to all head pods in the cluster (e.g. --head-node-selector=cloud.google.com/gke-accelerator=nvidia-l4,cloud.google.com/gke-nodepool=my-node-pool)")
114+
cmd.Flags().Int32Var(&options.workerReplicas, "worker-replicas", util.DefaultWorkerReplicas, "desired worker group replicas")
115+
cmd.Flags().StringVar(&options.workerCPU, "worker-cpu", util.DefaultWorkerCPU, "number of CPUs in each worker group replica")
116+
cmd.Flags().StringVar(&options.workerMemory, "worker-memory", util.DefaultWorkerMemory, "amount of memory in each worker group replica")
117+
cmd.Flags().StringVar(&options.workerGPU, "worker-gpu", util.DefaultWorkerGPU, "number of GPUs in each worker group replica")
118+
cmd.Flags().StringVar(&options.workerEphemeralStorage, "worker-ephemeral-storage", util.DefaultWorkerEphemeralStorage, "amount of ephemeral storage in each worker group replica")
108119
cmd.Flags().StringToStringVar(&options.workerRayStartParams, "worker-ray-start-params", options.workerRayStartParams, "a map of arguments to the Ray workers' 'ray start' entrypoint, e.g. '--worker-ray-start-params metrics-export-port=8080,num-cpus=2'")
120+
cmd.Flags().StringToStringVar(&options.workerNodeSelectors, "worker-node-selectors", nil, "Node selectors to apply to all worker pods in the cluster (e.g. --worker-node-selector=cloud.google.com/gke-accelerator=nvidia-l4,cloud.google.com/gke-nodepool=my-node-pool)")
121+
cmd.Flags().StringVar(&options.configFile, "file", "", "path to a YAML file containing Ray cluster configuration")
109122
cmd.Flags().BoolVar(&options.dryRun, "dry-run", false, "print the generated YAML instead of creating the cluster")
110123
cmd.Flags().BoolVar(&options.wait, "wait", false, "wait for the cluster to be provisioned before returning. Returns an error if the cluster is not provisioned by the timeout specified")
111124
cmd.Flags().DurationVar(&options.timeout, "timeout", defaultProvisionedTimeout, "the timeout for --wait")
112-
cmd.Flags().StringToStringVar(&options.headNodeSelectors, "head-node-selectors", nil, "Node selectors to apply to all head pods in the cluster (e.g. --head-node-selectors cloud.google.com/gke-accelerator=nvidia-l4,cloud.google.com/gke-nodepool=my-node-pool)")
113-
cmd.Flags().StringToStringVar(&options.workerNodeSelectors, "worker-node-selectors", nil, "Node selectors to apply to all worker pods in the cluster (e.g. --worker-node-selectors cloud.google.com/gke-accelerator=nvidia-l4,cloud.google.com/gke-nodepool=my-node-pool)")
114-
cmd.Flags().StringToStringVar(&options.labels, "labels", nil, "K8s labels (e.g. --labels app=ray,env=dev)")
115-
cmd.Flags().StringToStringVar(&options.annotations, "annotations", nil, "K8s annotations (e.g. --annotations ttl-hours=24,owner=chthulu)")
116125

117126
return cmd
118127
}
@@ -124,10 +133,6 @@ func (options *CreateClusterOptions) Complete(cmd *cobra.Command, args []string)
124133
}
125134
options.namespace = namespace
126135

127-
if options.namespace == "" {
128-
options.namespace = "default"
129-
}
130-
131136
if len(args) != 1 {
132137
return cmdutil.UsageErrorf(cmd, "%s", cmd.Use)
133138
}
@@ -140,63 +145,138 @@ func (options *CreateClusterOptions) Complete(cmd *cobra.Command, args []string)
140145
return nil
141146
}
142147

143-
func (options *CreateClusterOptions) Validate() error {
144-
resourceFields := map[string]string{
145-
"head-cpu": options.headCPU,
146-
"head-gpu": options.headGPU,
147-
"head-memory": options.headMemory,
148-
"head-ephemeral-storage": options.headEphemeralStorage,
149-
"worker-cpu": options.workerCPU,
150-
"worker-gpu": options.workerGPU,
151-
"worker-memory": options.workerMemory,
152-
"worker-ephemeral-storage": options.workerEphemeralStorage,
153-
}
154-
155-
for name, value := range resourceFields {
156-
if (name == "head-ephemeral-storage" || name == "worker-ephemeral-storage") && value == "" {
157-
continue
148+
func (options *CreateClusterOptions) Validate(cmd *cobra.Command) error {
149+
if options.configFile != "" {
150+
if err := flagsIncompatibleWithConfigFilePresent(cmd); err != nil {
151+
return err
158152
}
159-
if err := util.ValidateResourceQuantity(value, name); err != nil {
160-
return fmt.Errorf("%w", err)
153+
154+
// If a cluster config file is provided, check it can be parsed into a RayClusterConfig object
155+
rayClusterConfig, err := generation.ParseConfigFile(options.configFile)
156+
if err != nil {
157+
return fmt.Errorf("failed to parse config file: %w", err)
158+
}
159+
160+
if err := generation.ValidateConfig(rayClusterConfig); err != nil {
161+
return fmt.Errorf("failed to validate config file: %w", err)
162+
}
163+
164+
// store the returned RayClusterConfig object for use in Run()
165+
options.rayClusterConfig = rayClusterConfig
166+
} else {
167+
resourceFields := map[string]string{
168+
"head-cpu": options.headCPU,
169+
"head-gpu": options.headGPU,
170+
"head-memory": options.headMemory,
171+
"head-ephemeral-storage": options.headEphemeralStorage,
172+
"worker-cpu": options.workerCPU,
173+
"worker-gpu": options.workerGPU,
174+
"worker-memory": options.workerMemory,
175+
"worker-ephemeral-storage": options.workerEphemeralStorage,
176+
}
177+
178+
for name, value := range resourceFields {
179+
if (name == "head-ephemeral-storage" || name == "worker-ephemeral-storage") && value == "" {
180+
continue
181+
}
182+
if err := util.ValidateResourceQuantity(value, name); err != nil {
183+
return fmt.Errorf("%w", err)
184+
}
161185
}
162186
}
163187

164188
return nil
165189
}
166190

191+
// resolveClusterName resolves the cluster name from the CLI flag and the config file
192+
func (options *CreateClusterOptions) resolveClusterName() (string, error) {
193+
var name string
194+
195+
if options.rayClusterConfig.Name != nil && *options.rayClusterConfig.Name != "" && options.clusterName != "" && options.clusterName != *options.rayClusterConfig.Name {
196+
return "", fmt.Errorf("the cluster name in the config file %q does not match the cluster name %q. You must use the same name to perform this operation", *options.rayClusterConfig.Name, options.clusterName)
197+
}
198+
199+
if options.clusterName != "" {
200+
name = options.clusterName
201+
} else if options.rayClusterConfig.Name != nil && *options.rayClusterConfig.Name != "" {
202+
name = *options.rayClusterConfig.Name
203+
} else {
204+
return "", fmt.Errorf("the cluster name is required")
205+
}
206+
207+
return name, nil
208+
}
209+
210+
// resolveNamespace resolves the namespace from the CLI flag and the config file
211+
func (options *CreateClusterOptions) resolveNamespace() (string, error) {
212+
namespace := "default"
213+
214+
if options.rayClusterConfig.Namespace != nil && *options.rayClusterConfig.Namespace != "" && options.namespace != "" && options.namespace != *options.rayClusterConfig.Namespace {
215+
return "", fmt.Errorf("the namespace in the config file %q does not match the namespace %q. You must pass --namespace=%s to perform this operation", *options.rayClusterConfig.Namespace, options.namespace, *options.rayClusterConfig.Namespace)
216+
}
217+
218+
if options.namespace != "" {
219+
namespace = options.namespace
220+
} else if options.rayClusterConfig.Namespace != nil && *options.rayClusterConfig.Namespace != "" {
221+
namespace = *options.rayClusterConfig.Namespace
222+
}
223+
224+
return namespace, nil
225+
}
226+
167227
func (options *CreateClusterOptions) Run(ctx context.Context, k8sClient client.Client) error {
168-
if clusterExists(k8sClient.RayClient(), options.namespace, options.clusterName) {
169-
return fmt.Errorf("the Ray cluster %s in namespace %s already exists", options.clusterName, options.namespace)
170-
}
171-
172-
rayClusterSpecObject := generation.RayClusterSpecObject{
173-
Namespace: &options.namespace,
174-
Name: &options.clusterName,
175-
Labels: options.labels,
176-
Annotations: options.annotations,
177-
RayVersion: &options.rayVersion,
178-
Image: &options.image,
179-
HeadCPU: &options.headCPU,
180-
HeadMemory: &options.headMemory,
181-
HeadEphemeralStorage: &options.headEphemeralStorage,
182-
HeadGPU: &options.headGPU,
183-
HeadRayStartParams: options.headRayStartParams,
184-
HeadNodeSelectors: options.headNodeSelectors,
185-
WorkerGroups: []generation.WorkerGroupConfig{
186-
{
187-
Name: ptr.To("default-group"),
188-
WorkerReplicas: &options.workerReplicas,
189-
WorkerCPU: &options.workerCPU,
190-
WorkerMemory: &options.workerMemory,
191-
WorkerEphemeralStorage: &options.workerEphemeralStorage,
192-
WorkerGPU: &options.workerGPU,
193-
WorkerRayStartParams: options.workerRayStartParams,
194-
WorkerNodeSelectors: options.workerNodeSelectors,
228+
var rayClusterac *rayv1ac.RayClusterApplyConfiguration
229+
var err error
230+
231+
// If options.rayClusterConfig is set, use it exclusively because it means the user provided a config file
232+
if options.rayClusterConfig != nil {
233+
name, err := options.resolveClusterName()
234+
if err != nil {
235+
return err
236+
}
237+
options.rayClusterConfig.Name = &name
238+
239+
namespace, err := options.resolveNamespace()
240+
if err != nil {
241+
return err
242+
}
243+
options.rayClusterConfig.Namespace = &namespace
244+
} else {
245+
if options.namespace == "" {
246+
options.namespace = "default"
247+
}
248+
249+
options.rayClusterConfig = &generation.RayClusterConfig{
250+
Namespace: &options.namespace,
251+
Name: &options.clusterName,
252+
Labels: options.labels,
253+
Annotations: options.annotations,
254+
RayVersion: &options.rayVersion,
255+
Image: &options.image,
256+
Head: &generation.Head{
257+
CPU: &options.headCPU,
258+
Memory: &options.headMemory,
259+
EphemeralStorage: &options.headEphemeralStorage,
260+
GPU: &options.headGPU,
261+
RayStartParams: options.headRayStartParams,
262+
NodeSelectors: options.headNodeSelectors,
195263
},
196-
},
264+
WorkerGroups: []generation.WorkerGroup{
265+
{
266+
Name: ptr.To("default-group"),
267+
Replicas: options.workerReplicas,
268+
CPU: &options.workerCPU,
269+
Memory: &options.workerMemory,
270+
EphemeralStorage: &options.workerEphemeralStorage,
271+
GPU: &options.workerGPU,
272+
RayStartParams: options.workerRayStartParams,
273+
NodeSelectors: options.workerNodeSelectors,
274+
},
275+
},
276+
}
197277
}
198278

199-
rayClusterac := rayClusterSpecObject.GenerateRayClusterApplyConfig()
279+
rayClusterac = options.rayClusterConfig.GenerateRayClusterApplyConfig()
200280

201281
// If dry run is enabled, it will call the YAML converter and print out the YAML
202282
if options.dryRun {
@@ -208,16 +288,18 @@ func (options *CreateClusterOptions) Run(ctx context.Context, k8sClient client.C
208288
return nil
209289
}
210290

211-
// TODO: Decide whether to save YAML to file or not.
291+
if clusterExists(k8sClient.RayClient(), *options.rayClusterConfig.Namespace, *options.rayClusterConfig.Name) {
292+
return fmt.Errorf("the Ray cluster %s in namespace %s already exists", *options.rayClusterConfig.Name, *options.rayClusterConfig.Namespace)
293+
}
212294

213-
result, err := k8sClient.RayClient().RayV1().RayClusters(options.namespace).Apply(ctx, rayClusterac, metav1.ApplyOptions{FieldManager: util.FieldManager})
295+
result, err := k8sClient.RayClient().RayV1().RayClusters(*options.rayClusterConfig.Namespace).Apply(ctx, rayClusterac, metav1.ApplyOptions{FieldManager: util.FieldManager})
214296
if err != nil {
215297
return fmt.Errorf("failed to create Ray cluster: %w", err)
216298
}
217299
fmt.Printf("Created Ray cluster: %s\n", result.GetName())
218300

219301
if options.wait {
220-
err = k8sClient.WaitRayClusterProvisioned(ctx, options.namespace, result.GetName(), options.timeout)
302+
err = k8sClient.WaitRayClusterProvisioned(ctx, *options.rayClusterConfig.Namespace, result.GetName(), options.timeout)
221303
if err != nil {
222304
return err
223305
}
@@ -234,3 +316,32 @@ func clusterExists(client rayclient.Interface, namespace, name string) bool {
234316
}
235317
return false
236318
}
319+
320+
// flagsIncompatibleWithConfigFilePresent returns an error if there are command-line flags that are incompatible with a config file
321+
func flagsIncompatibleWithConfigFilePresent(cmd *cobra.Command) error {
322+
incompatibleFlagsUsed := []string{}
323+
324+
// Define which flags are allowed to be used with --file.
325+
// These are typically flags that modify the command's behavior but not the cluster configuration.
326+
allowedWithFile := map[string]bool{
327+
"file": true,
328+
"context": true,
329+
"namespace": true,
330+
"dry-run": true,
331+
"wait": true,
332+
"timeout": true,
333+
}
334+
335+
// Check all flags to see if any incompatible flags are set
336+
cmd.Flags().Visit(func(f *pflag.Flag) {
337+
if !allowedWithFile[f.Name] {
338+
incompatibleFlagsUsed = append(incompatibleFlagsUsed, f.Name)
339+
}
340+
})
341+
342+
if len(incompatibleFlagsUsed) > 0 {
343+
return fmt.Errorf("the following flags are incompatible with --file: %v", incompatibleFlagsUsed)
344+
}
345+
346+
return nil
347+
}

0 commit comments

Comments
 (0)