Skip to content

Commit a7d1dc5

Browse files
committed
[feat][plugin] support creating RayCluster with config file
as described in the [Ray Kubectl Plugin 1.4.0 Wishlist][1]. Signed-off-by: David Xia <[email protected]> [1]: https://docs.google.com/document/d/18V5_7SGUzS0LGlaJB7xus04omyr3drKkK4vV6Kz14jY/edit?tab=t.0#heading=h.tuqyq1b6b2x7
1 parent cfaabee commit a7d1dc5

File tree

6 files changed

+650
-126
lines changed

6 files changed

+650
-126
lines changed

kubectl-plugin/pkg/cmd/create/create_cluster.go

+173-64
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,20 @@ import (
55
"fmt"
66
"time"
77

8+
"k8s.io/utils/ptr"
9+
810
"github.com/ray-project/kuberay/kubectl-plugin/pkg/util"
911
"github.com/ray-project/kuberay/kubectl-plugin/pkg/util/generation"
1012
"github.com/spf13/cobra"
13+
"github.com/spf13/pflag"
1114
"k8s.io/cli-runtime/pkg/genericclioptions"
12-
"k8s.io/utils/ptr"
1315

1416
"github.com/ray-project/kuberay/kubectl-plugin/pkg/util/client"
1517
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1618
cmdutil "k8s.io/kubectl/pkg/cmd/util"
1719
"k8s.io/kubectl/pkg/util/templates"
1820

21+
rayv1ac "github.com/ray-project/kuberay/ray-operator/pkg/client/applyconfiguration/ray/v1"
1922
rayclient "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned"
2023
)
2124

@@ -26,6 +29,7 @@ type CreateClusterOptions struct {
2629
headRayStartParams map[string]string
2730
headNodeSelectors map[string]string
2831
workerNodeSelectors map[string]string
32+
rayClusterConfig *generation.RayClusterConfig
2933
namespace string
3034
clusterName string
3135
rayVersion string
@@ -38,6 +42,7 @@ type CreateClusterOptions struct {
3842
workerMemory string
3943
workerEphemeralStorage string
4044
workerGPU string
45+
configFile string
4146
workerReplicas int32
4247
dryRun bool
4348
wait bool
@@ -56,6 +61,9 @@ var (
5661
5762
# Create a Ray cluster with K8s labels and annotations
5863
kubectl ray create cluster sample-cluster --labels app=ray,env=dev --annotations ttl-hours=24,owner=chthulu
64+
65+
# Create a Ray cluster from a YAML configuration file
66+
kubectl ray create cluster sample-cluster --file ray-cluster-config.yaml
5967
`, util.RayVersion, util.RayImage))
6068
)
6169

@@ -78,7 +86,7 @@ func NewCreateClusterCommand(cmdFactory cmdutil.Factory, streams genericclioptio
7886
if err := options.Complete(cmd, args); err != nil {
7987
return err
8088
}
81-
if err := options.Validate(); err != nil {
89+
if err := options.Validate(cmd); err != nil {
8290
return err
8391
}
8492

@@ -93,22 +101,23 @@ func NewCreateClusterCommand(cmdFactory cmdutil.Factory, streams genericclioptio
93101

94102
cmd.Flags().StringVar(&options.rayVersion, "ray-version", util.RayVersion, "Ray version to use")
95103
cmd.Flags().StringVar(&options.image, "image", fmt.Sprintf("rayproject/ray:%s", options.rayVersion), "container image to use")
96-
cmd.Flags().StringVar(&options.headCPU, "head-cpu", "2", "number of CPUs in the Ray head")
97-
cmd.Flags().StringVar(&options.headMemory, "head-memory", "4Gi", "amount of memory in the Ray head")
98-
cmd.Flags().StringVar(&options.headGPU, "head-gpu", "0", "number of GPUs in the Ray head")
99-
cmd.Flags().StringVar(&options.headEphemeralStorage, "head-ephemeral-storage", "", "amount of ephemeral storage in the Ray head")
104+
cmd.Flags().StringVar(&options.headCPU, "head-cpu", util.DefaultHeadCPU, "number of CPUs in the Ray head")
105+
cmd.Flags().StringVar(&options.headMemory, "head-memory", util.DefaultHeadMemory, "amount of memory in the Ray head")
106+
cmd.Flags().StringVar(&options.headGPU, "head-gpu", util.DefaultHeadGPU, "number of GPUs in the Ray head")
107+
cmd.Flags().StringVar(&options.headEphemeralStorage, "head-ephemeral-storage", util.DefaultHeadEphemeralStorage, "amount of ephemeral storage in the Ray head")
100108
cmd.Flags().StringToStringVar(&options.headRayStartParams, "head-ray-start-params", options.headRayStartParams, "a map of arguments to the Ray head's 'ray start' entrypoint, e.g. '--head-ray-start-params dashboard-host=0.0.0.0,num-cpus=2'")
101-
cmd.Flags().Int32Var(&options.workerReplicas, "worker-replicas", 1, "desired worker group replicas")
102-
cmd.Flags().StringVar(&options.workerCPU, "worker-cpu", "2", "number of CPUs in each worker group replica")
103-
cmd.Flags().StringVar(&options.workerMemory, "worker-memory", "4Gi", "amount of memory in each worker group replica")
104-
cmd.Flags().StringVar(&options.workerGPU, "worker-gpu", "0", "number of GPUs in each worker group replica")
105-
cmd.Flags().StringVar(&options.workerEphemeralStorage, "worker-ephemeral-storage", "", "amount of ephemeral storage in each worker group replica")
109+
cmd.Flags().StringToStringVar(&options.headNodeSelectors, "head-node-selectors", nil, "Node selectors to apply to all head pods in the cluster (e.g. --head-node-selector=cloud.google.com/gke-accelerator=nvidia-l4,cloud.google.com/gke-nodepool=my-node-pool)")
110+
cmd.Flags().Int32Var(&options.workerReplicas, "worker-replicas", util.DefaultWorkerReplicas, "desired worker group replicas")
111+
cmd.Flags().StringVar(&options.workerCPU, "worker-cpu", util.DefaultWorkerCPU, "number of CPUs in each worker group replica")
112+
cmd.Flags().StringVar(&options.workerMemory, "worker-memory", util.DefaultWorkerMemory, "amount of memory in each worker group replica")
113+
cmd.Flags().StringVar(&options.workerGPU, "worker-gpu", util.DefaultWorkerGPU, "number of GPUs in each worker group replica")
114+
cmd.Flags().StringVar(&options.workerEphemeralStorage, "worker-ephemeral-storage", util.DefaultWorkerEphemeralStorage, "amount of ephemeral storage in each worker group replica")
106115
cmd.Flags().StringToStringVar(&options.workerRayStartParams, "worker-ray-start-params", options.workerRayStartParams, "a map of arguments to the Ray workers' 'ray start' entrypoint, e.g. '--worker-ray-start-params metrics-export-port=8080,num-cpus=2'")
116+
cmd.Flags().StringToStringVar(&options.workerNodeSelectors, "worker-node-selectors", nil, "Node selectors to apply to all worker pods in the cluster (e.g. --worker-node-selector=cloud.google.com/gke-accelerator=nvidia-l4,cloud.google.com/gke-nodepool=my-node-pool)")
117+
cmd.Flags().StringVar(&options.configFile, "file", "", "path to a YAML file containing Ray cluster configuration")
107118
cmd.Flags().BoolVar(&options.dryRun, "dry-run", false, "print the generated YAML instead of creating the cluster")
108119
cmd.Flags().BoolVar(&options.wait, "wait", false, "wait for the cluster to be provisioned before returning. Returns an error if the cluster is not provisioned by the timeout specified")
109120
cmd.Flags().DurationVar(&options.timeout, "timeout", defaultProvisionedTimeout, "the timeout for --wait")
110-
cmd.Flags().StringToStringVar(&options.headNodeSelectors, "head-node-selectors", nil, "Node selectors to apply to all head pods in the cluster (e.g. --head-node-selectors cloud.google.com/gke-accelerator=nvidia-l4,cloud.google.com/gke-nodepool=my-node-pool)")
111-
cmd.Flags().StringToStringVar(&options.workerNodeSelectors, "worker-node-selectors", nil, "Node selectors to apply to all worker pods in the cluster (e.g. --worker-node-selectors cloud.google.com/gke-accelerator=nvidia-l4,cloud.google.com/gke-nodepool=my-node-pool)")
112121

113122
return cmd
114123
}
@@ -120,10 +129,6 @@ func (options *CreateClusterOptions) Complete(cmd *cobra.Command, args []string)
120129
}
121130
options.namespace = namespace
122131

123-
if options.namespace == "" {
124-
options.namespace = "default"
125-
}
126-
127132
if len(args) != 1 {
128133
return cmdutil.UsageErrorf(cmd, "%s", cmd.Use)
129134
}
@@ -136,61 +141,134 @@ func (options *CreateClusterOptions) Complete(cmd *cobra.Command, args []string)
136141
return nil
137142
}
138143

139-
func (options *CreateClusterOptions) Validate() error {
140-
resourceFields := map[string]string{
141-
"head-cpu": options.headCPU,
142-
"head-gpu": options.headGPU,
143-
"head-memory": options.headMemory,
144-
"head-ephemeral-storage": options.headEphemeralStorage,
145-
"worker-cpu": options.workerCPU,
146-
"worker-gpu": options.workerGPU,
147-
"worker-memory": options.workerMemory,
148-
"worker-ephemeral-storage": options.workerEphemeralStorage,
149-
}
150-
151-
for name, value := range resourceFields {
152-
if (name == "head-ephemeral-storage" || name == "worker-ephemeral-storage") && value == "" {
153-
continue
144+
func (options *CreateClusterOptions) Validate(cmd *cobra.Command) error {
145+
if options.configFile != "" {
146+
if err := switchesIncompatibleWithConfigFilePresent(cmd); err != nil {
147+
return err
148+
}
149+
150+
// If a cluster config file is provided, check it can be parsed into a RayClusterConfig object
151+
rayClusterConfig, err := generation.ParseConfigFile(options.configFile)
152+
if err != nil {
153+
return fmt.Errorf("failed to parse config file: %w", err)
154154
}
155-
if err := util.ValidateResourceQuantity(value, name); err != nil {
156-
return fmt.Errorf("%w", err)
155+
156+
if err := generation.ValidateConfig(rayClusterConfig); err != nil {
157+
return fmt.Errorf("failed to validate config file: %w", err)
158+
}
159+
160+
// store the returned RayClusterConfig object for use in Run()
161+
options.rayClusterConfig = rayClusterConfig
162+
} else {
163+
resourceFields := map[string]string{
164+
"head-cpu": options.headCPU,
165+
"head-gpu": options.headGPU,
166+
"head-memory": options.headMemory,
167+
"head-ephemeral-storage": options.headEphemeralStorage,
168+
"worker-cpu": options.workerCPU,
169+
"worker-gpu": options.workerGPU,
170+
"worker-memory": options.workerMemory,
171+
"worker-ephemeral-storage": options.workerEphemeralStorage,
172+
}
173+
174+
for name, value := range resourceFields {
175+
if (name == "head-ephemeral-storage" || name == "worker-ephemeral-storage") && value == "" {
176+
continue
177+
}
178+
if err := util.ValidateResourceQuantity(value, name); err != nil {
179+
return fmt.Errorf("%w", err)
180+
}
157181
}
158182
}
159183

160184
return nil
161185
}
162186

187+
// resolveClusterName resolves the cluster name from the CLI flag and the config file
188+
func (options *CreateClusterOptions) resolveClusterName() (string, error) {
189+
var name string
190+
191+
if options.rayClusterConfig.Name != nil && *options.rayClusterConfig.Name != "" && options.clusterName != "" && options.clusterName != *options.rayClusterConfig.Name {
192+
return "", fmt.Errorf("the cluster name in the config file %q does not match the cluster name %q. You must use the same name to perform this operation", *options.rayClusterConfig.Name, options.clusterName)
193+
}
194+
195+
if options.clusterName != "" {
196+
name = options.clusterName
197+
} else if options.rayClusterConfig.Name != nil && *options.rayClusterConfig.Name != "" {
198+
name = *options.rayClusterConfig.Name
199+
} else {
200+
return "", fmt.Errorf("the cluster name is required")
201+
}
202+
203+
return name, nil
204+
}
205+
206+
// resolveNamespace resolves the namespace from the CLI flag and the config file
207+
func (options *CreateClusterOptions) resolveNamespace() (string, error) {
208+
namespace := "default"
209+
210+
if options.rayClusterConfig.Namespace != nil && *options.rayClusterConfig.Namespace != "" && options.namespace != "" && options.namespace != *options.rayClusterConfig.Namespace {
211+
return "", fmt.Errorf("the namespace in the config file %q does not match the namespace %q. You must pass --namespace=%s to perform this operation", *options.rayClusterConfig.Namespace, options.namespace, *options.rayClusterConfig.Namespace)
212+
}
213+
214+
if options.namespace != "" {
215+
namespace = options.namespace
216+
} else if options.rayClusterConfig.Namespace != nil && *options.rayClusterConfig.Namespace != "" {
217+
namespace = *options.rayClusterConfig.Namespace
218+
}
219+
220+
return namespace, nil
221+
}
222+
163223
func (options *CreateClusterOptions) Run(ctx context.Context, k8sClient client.Client) error {
164-
if clusterExists(k8sClient.RayClient(), options.namespace, options.clusterName) {
165-
return fmt.Errorf("the Ray cluster %s in namespace %s already exists", options.clusterName, options.namespace)
166-
}
167-
168-
rayClusterSpecObject := generation.RayClusterSpecObject{
169-
Namespace: &options.namespace,
170-
Name: &options.clusterName,
171-
RayVersion: &options.rayVersion,
172-
Image: &options.image,
173-
HeadCPU: &options.headCPU,
174-
HeadMemory: &options.headMemory,
175-
HeadEphemeralStorage: &options.headEphemeralStorage,
176-
HeadGPU: &options.headGPU,
177-
HeadRayStartParams: options.headRayStartParams,
178-
HeadNodeSelectors: options.headNodeSelectors,
179-
WorkerGroups: []generation.WorkerGroupConfig{
180-
{
181-
Name: ptr.To("default-group"),
182-
WorkerReplicas: &options.workerReplicas,
183-
WorkerCPU: &options.workerCPU,
184-
WorkerMemory: &options.workerMemory,
185-
WorkerEphemeralStorage: &options.workerEphemeralStorage,
186-
WorkerGPU: &options.workerGPU,
187-
WorkerRayStartParams: options.workerRayStartParams,
188-
WorkerNodeSelectors: options.workerNodeSelectors,
224+
var rayClusterac *rayv1ac.RayClusterApplyConfiguration
225+
var err error
226+
227+
// If options.rayClusterConfig is set, use it exclusively because it means the user provided a config file
228+
if options.rayClusterConfig != nil {
229+
name, err := options.resolveClusterName()
230+
if err != nil {
231+
return err
232+
}
233+
options.rayClusterConfig.Name = &name
234+
235+
namespace, err := options.resolveNamespace()
236+
if err != nil {
237+
return err
238+
}
239+
options.rayClusterConfig.Namespace = &namespace
240+
} else {
241+
if options.namespace == "" {
242+
options.namespace = "default"
243+
}
244+
245+
options.rayClusterConfig = &generation.RayClusterConfig{
246+
Namespace: &options.namespace,
247+
Name: &options.clusterName,
248+
RayVersion: &options.rayVersion,
249+
Image: &options.image,
250+
HeadCPU: &options.headCPU,
251+
HeadMemory: &options.headMemory,
252+
HeadEphemeralStorage: &options.headEphemeralStorage,
253+
HeadGPU: &options.headGPU,
254+
HeadRayStartParams: options.headRayStartParams,
255+
HeadNodeSelectors: options.headNodeSelectors,
256+
WorkerGroups: []generation.WorkerGroup{
257+
{
258+
Name: ptr.To("default-group"),
259+
WorkerReplicas: options.workerReplicas,
260+
WorkerCPU: &options.workerCPU,
261+
WorkerMemory: &options.workerMemory,
262+
WorkerEphemeralStorage: &options.workerEphemeralStorage,
263+
WorkerGPU: &options.workerGPU,
264+
WorkerRayStartParams: options.workerRayStartParams,
265+
WorkerNodeSelectors: options.workerNodeSelectors,
266+
},
189267
},
190-
},
268+
}
191269
}
192270

193-
rayClusterac := rayClusterSpecObject.GenerateRayClusterApplyConfig()
271+
rayClusterac = options.rayClusterConfig.GenerateRayClusterApplyConfig()
194272

195273
// If dry run is enabled, it will call the YAML converter and print out the YAML
196274
if options.dryRun {
@@ -202,16 +280,18 @@ func (options *CreateClusterOptions) Run(ctx context.Context, k8sClient client.C
202280
return nil
203281
}
204282

205-
// TODO: Decide whether to save YAML to file or not.
283+
if clusterExists(k8sClient.RayClient(), *options.rayClusterConfig.Namespace, *options.rayClusterConfig.Name) {
284+
return fmt.Errorf("the Ray cluster %s in namespace %s already exists", *options.rayClusterConfig.Name, *options.rayClusterConfig.Namespace)
285+
}
206286

207-
result, err := k8sClient.RayClient().RayV1().RayClusters(options.namespace).Apply(ctx, rayClusterac, metav1.ApplyOptions{FieldManager: util.FieldManager})
287+
result, err := k8sClient.RayClient().RayV1().RayClusters(*options.rayClusterConfig.Namespace).Apply(ctx, rayClusterac, metav1.ApplyOptions{FieldManager: util.FieldManager})
208288
if err != nil {
209289
return fmt.Errorf("failed to create Ray cluster: %w", err)
210290
}
211291
fmt.Printf("Created Ray cluster: %s\n", result.GetName())
212292

213293
if options.wait {
214-
err = k8sClient.WaitRayClusterProvisioned(ctx, options.namespace, result.GetName(), options.timeout)
294+
err = k8sClient.WaitRayClusterProvisioned(ctx, *options.rayClusterConfig.Namespace, result.GetName(), options.timeout)
215295
if err != nil {
216296
return err
217297
}
@@ -228,3 +308,32 @@ func clusterExists(client rayclient.Interface, namespace, name string) bool {
228308
}
229309
return false
230310
}
311+
312+
// switchesIncompatibleWithConfigFilePresent returns an error if there are command-line flags that are incompatible with a config file
313+
func switchesIncompatibleWithConfigFilePresent(cmd *cobra.Command) error {
314+
incompatibleFlagsUsed := []string{}
315+
316+
// Define which flags are allowed to be used with --file.
317+
// These are typically flags that control the command's behavior but not the cluster configuration.
318+
allowedWithFile := map[string]bool{
319+
"file": true,
320+
"context": true,
321+
"namespace": true,
322+
"dry-run": true,
323+
"wait": true,
324+
"timeout": true,
325+
}
326+
327+
// Check all flags to see if any non-allowed flags were set
328+
cmd.Flags().Visit(func(f *pflag.Flag) {
329+
if !allowedWithFile[f.Name] {
330+
incompatibleFlagsUsed = append(incompatibleFlagsUsed, f.Name)
331+
}
332+
})
333+
334+
if len(incompatibleFlagsUsed) > 0 {
335+
return fmt.Errorf("the following flags are incompatible with --file: %v", incompatibleFlagsUsed)
336+
}
337+
338+
return nil
339+
}

0 commit comments

Comments
 (0)