Skip to content

Commit 372345e

Browse files
authoredJan 28, 2025··
Adds RayCluster.apply() (#778)
- Adds RayCluster.apply() implementation - Adds e2e tests for apply - Adds unit tests for apply
1 parent b5c13dc commit 372345e

File tree

10 files changed

+687
-38
lines changed

10 files changed

+687
-38
lines changed
 

‎CONTRIBUTING.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ pytest -v src/codeflare_sdk
7676

7777
### Local e2e Testing
7878

79-
- Please follow the [e2e documentation](https://github.com/project-codeflare/codeflare-sdk/blob/main/docs/e2e.md)
79+
- Please follow the [e2e documentation](https://github.com/project-codeflare/codeflare-sdk/blob/main/docs/sphinx/user-docs/e2e.rst)
8080

8181
#### Code Coverage
8282

‎docs/sphinx/user-docs/ray-cluster-interaction.rst

+6
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@ cluster.up()
6666

6767
| The ``cluster.up()`` function creates a Ray Cluster in the given namespace.
6868
69+
cluster.apply()
70+
------------
71+
72+
| The ``cluster.apply()`` function applies a Ray Cluster in the given namespace. If the cluster already exists, it is updated.
73+
| If it does not exist it is created.
74+
6975
cluster.down()
7076
--------------
7177

‎src/codeflare_sdk/common/kueue/test_kueue.py

+125-8
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,18 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
from ..utils.unit_test_support import (
15-
apply_template,
1615
get_local_queue,
17-
createClusterConfig,
16+
create_cluster_config,
1817
get_template_variables,
18+
apply_template,
1919
)
2020
from unittest.mock import patch
2121
from codeflare_sdk.ray.cluster.cluster import Cluster, ClusterConfiguration
2222
import yaml
2323
import os
2424
import filecmp
2525
from pathlib import Path
26-
from .kueue import list_local_queues
26+
from .kueue import list_local_queues, local_queue_exists, add_queue_label
2727

2828
parent = Path(__file__).resolve().parents[4] # project directory
2929
aw_dir = os.path.expanduser("~/.codeflare/resources/")
@@ -51,7 +51,7 @@ def test_cluster_creation_no_aw_local_queue(mocker):
5151
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
5252
return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"),
5353
)
54-
config = createClusterConfig()
54+
config = create_cluster_config()
5555
config.name = "unit-test-cluster-kueue"
5656
config.write_to_file = True
5757
config.local_queue = "local-queue-default"
@@ -67,7 +67,7 @@ def test_cluster_creation_no_aw_local_queue(mocker):
6767
assert cluster_kueue == expected_rc
6868

6969
# With resources loaded in memory, no Local Queue specified.
70-
config = createClusterConfig()
70+
config = create_cluster_config()
7171
config.name = "unit-test-cluster-kueue"
7272
config.write_to_file = False
7373
cluster = Cluster(config)
@@ -84,7 +84,7 @@ def test_aw_creation_local_queue(mocker):
8484
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
8585
return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"),
8686
)
87-
config = createClusterConfig()
87+
config = create_cluster_config()
8888
config.name = "unit-test-aw-kueue"
8989
config.appwrapper = True
9090
config.write_to_file = True
@@ -101,7 +101,7 @@ def test_aw_creation_local_queue(mocker):
101101
assert aw_kueue == expected_rc
102102

103103
# With resources loaded in memory, no Local Queue specified.
104-
config = createClusterConfig()
104+
config = create_cluster_config()
105105
config.name = "unit-test-aw-kueue"
106106
config.appwrapper = True
107107
config.write_to_file = False
@@ -120,7 +120,7 @@ def test_get_local_queue_exists_fail(mocker):
120120
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
121121
return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"),
122122
)
123-
config = createClusterConfig()
123+
config = create_cluster_config()
124124
config.name = "unit-test-aw-kueue"
125125
config.appwrapper = True
126126
config.write_to_file = True
@@ -175,6 +175,123 @@ def test_list_local_queues(mocker):
175175
assert lqs == []
176176

177177

178+
def test_local_queue_exists_found(mocker):
179+
# Mock Kubernetes client and list_namespaced_custom_object method
180+
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
181+
mock_api_instance = mocker.Mock()
182+
mocker.patch("kubernetes.client.CustomObjectsApi", return_value=mock_api_instance)
183+
mocker.patch("codeflare_sdk.ray.cluster.cluster.config_check")
184+
185+
# Mock return value for list_namespaced_custom_object
186+
mock_api_instance.list_namespaced_custom_object.return_value = {
187+
"items": [
188+
{"metadata": {"name": "existing-queue"}},
189+
{"metadata": {"name": "another-queue"}},
190+
]
191+
}
192+
193+
# Call the function
194+
namespace = "test-namespace"
195+
local_queue_name = "existing-queue"
196+
result = local_queue_exists(namespace, local_queue_name)
197+
198+
# Assertions
199+
assert result is True
200+
mock_api_instance.list_namespaced_custom_object.assert_called_once_with(
201+
group="kueue.x-k8s.io",
202+
version="v1beta1",
203+
namespace=namespace,
204+
plural="localqueues",
205+
)
206+
207+
208+
def test_local_queue_exists_not_found(mocker):
209+
# Mock Kubernetes client and list_namespaced_custom_object method
210+
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
211+
mock_api_instance = mocker.Mock()
212+
mocker.patch("kubernetes.client.CustomObjectsApi", return_value=mock_api_instance)
213+
mocker.patch("codeflare_sdk.ray.cluster.cluster.config_check")
214+
215+
# Mock return value for list_namespaced_custom_object
216+
mock_api_instance.list_namespaced_custom_object.return_value = {
217+
"items": [
218+
{"metadata": {"name": "another-queue"}},
219+
{"metadata": {"name": "different-queue"}},
220+
]
221+
}
222+
223+
# Call the function
224+
namespace = "test-namespace"
225+
local_queue_name = "non-existent-queue"
226+
result = local_queue_exists(namespace, local_queue_name)
227+
228+
# Assertions
229+
assert result is False
230+
mock_api_instance.list_namespaced_custom_object.assert_called_once_with(
231+
group="kueue.x-k8s.io",
232+
version="v1beta1",
233+
namespace=namespace,
234+
plural="localqueues",
235+
)
236+
237+
238+
import pytest
239+
from unittest import mock # If you're also using mocker from pytest-mock
240+
241+
242+
def test_add_queue_label_with_valid_local_queue(mocker):
243+
# Mock the kubernetes.client.CustomObjectsApi and its response
244+
mock_api_instance = mocker.patch("kubernetes.client.CustomObjectsApi")
245+
mock_api_instance.return_value.list_namespaced_custom_object.return_value = {
246+
"items": [
247+
{"metadata": {"name": "valid-queue"}},
248+
]
249+
}
250+
251+
# Mock other dependencies
252+
mocker.patch("codeflare_sdk.common.kueue.local_queue_exists", return_value=True)
253+
mocker.patch(
254+
"codeflare_sdk.common.kueue.get_default_kueue_name",
255+
return_value="default-queue",
256+
)
257+
258+
# Define input item and parameters
259+
item = {"metadata": {}}
260+
namespace = "test-namespace"
261+
local_queue = "valid-queue"
262+
263+
# Call the function
264+
add_queue_label(item, namespace, local_queue)
265+
266+
# Assert that the label is added to the item
267+
assert item["metadata"]["labels"] == {"kueue.x-k8s.io/queue-name": "valid-queue"}
268+
269+
270+
def test_add_queue_label_with_invalid_local_queue(mocker):
271+
# Mock the kubernetes.client.CustomObjectsApi and its response
272+
mock_api_instance = mocker.patch("kubernetes.client.CustomObjectsApi")
273+
mock_api_instance.return_value.list_namespaced_custom_object.return_value = {
274+
"items": [
275+
{"metadata": {"name": "valid-queue"}},
276+
]
277+
}
278+
279+
# Mock the local_queue_exists function to return False
280+
mocker.patch("codeflare_sdk.common.kueue.local_queue_exists", return_value=False)
281+
282+
# Define input item and parameters
283+
item = {"metadata": {}}
284+
namespace = "test-namespace"
285+
local_queue = "invalid-queue"
286+
287+
# Call the function and expect a ValueError
288+
with pytest.raises(
289+
ValueError,
290+
match="local_queue provided does not exist or is not in this namespace",
291+
):
292+
add_queue_label(item, namespace, local_queue)
293+
294+
178295
# Make sure to always keep this function last
179296
def test_cleanup():
180297
os.remove(f"{aw_dir}unit-test-cluster-kueue.yaml")

‎src/codeflare_sdk/common/utils/unit_test_support.py

+55-11
Original file line numberDiff line numberDiff line change
@@ -29,32 +29,34 @@
2929
aw_dir = os.path.expanduser("~/.codeflare/resources/")
3030

3131

32-
def createClusterConfig():
32+
def create_cluster_config(num_workers=2, write_to_file=False):
3333
config = ClusterConfiguration(
3434
name="unit-test-cluster",
3535
namespace="ns",
36-
num_workers=2,
36+
num_workers=num_workers,
3737
worker_cpu_requests=3,
3838
worker_cpu_limits=4,
3939
worker_memory_requests=5,
4040
worker_memory_limits=6,
4141
appwrapper=True,
42-
write_to_file=False,
42+
write_to_file=write_to_file,
4343
)
4444
return config
4545

4646

47-
def createClusterWithConfig(mocker):
48-
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
49-
mocker.patch(
50-
"kubernetes.client.CustomObjectsApi.get_cluster_custom_object",
51-
return_value={"spec": {"domain": "apps.cluster.awsroute.org"}},
52-
)
53-
cluster = Cluster(createClusterConfig())
47+
def create_cluster(mocker, num_workers=2, write_to_file=False):
48+
cluster = Cluster(create_cluster_config(num_workers, write_to_file))
5449
return cluster
5550

5651

57-
def createClusterWrongType():
52+
def patch_cluster_with_dynamic_client(mocker, cluster, dynamic_client=None):
53+
mocker.patch.object(cluster, "get_dynamic_client", return_value=dynamic_client)
54+
mocker.patch.object(cluster, "down", return_value=None)
55+
mocker.patch.object(cluster, "config_check", return_value=None)
56+
# mocker.patch.object(cluster, "_throw_for_no_raycluster", return_value=None)
57+
58+
59+
def create_cluster_wrong_type():
5860
config = ClusterConfiguration(
5961
name="unit-test-cluster",
6062
namespace="ns",
@@ -412,6 +414,48 @@ def mocked_ingress(port, cluster_name="unit-test-cluster", annotations: dict = N
412414
return mock_ingress
413415

414416

417+
# Global dictionary to maintain state in the mock
418+
cluster_state = {}
419+
420+
421+
# The mock side_effect function for server_side_apply
422+
def mock_server_side_apply(resource, body=None, name=None, namespace=None, **kwargs):
423+
# Simulate the behavior of server_side_apply:
424+
# Update a mock state that represents the cluster's current configuration.
425+
# Stores the state in a global dictionary for simplicity.
426+
427+
global cluster_state
428+
429+
if not resource or not body or not name or not namespace:
430+
raise ValueError("Missing required parameters for server_side_apply")
431+
432+
# Extract worker count from the body if it exists
433+
try:
434+
worker_count = (
435+
body["spec"]["workerGroupSpecs"][0]["replicas"]
436+
if "spec" in body and "workerGroupSpecs" in body["spec"]
437+
else None
438+
)
439+
except KeyError:
440+
worker_count = None
441+
442+
# Apply changes to the cluster_state mock
443+
cluster_state[name] = {
444+
"namespace": namespace,
445+
"worker_count": worker_count,
446+
"body": body,
447+
}
448+
449+
# Return a response that mimics the behavior of a successful apply
450+
return {
451+
"status": "success",
452+
"applied": True,
453+
"name": name,
454+
"namespace": namespace,
455+
"worker_count": worker_count,
456+
}
457+
458+
415459
@patch.dict("os.environ", {"NB_PREFIX": "test-prefix"})
416460
def create_cluster_all_config_params(mocker, cluster_name, is_appwrapper) -> Cluster:
417461
mocker.patch(

‎src/codeflare_sdk/common/widgets/test_widgets.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import codeflare_sdk.common.widgets.widgets as cf_widgets
1616
import pandas as pd
1717
from unittest.mock import MagicMock, patch
18-
from ..utils.unit_test_support import get_local_queue, createClusterConfig
18+
from ..utils.unit_test_support import get_local_queue, create_cluster_config
1919
from codeflare_sdk.ray.cluster.cluster import Cluster
2020
from codeflare_sdk.ray.cluster.status import (
2121
RayCluster,
@@ -38,7 +38,7 @@ def test_cluster_up_down_buttons(mocker):
3838
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
3939
return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"),
4040
)
41-
cluster = Cluster(createClusterConfig())
41+
cluster = Cluster(create_cluster_config())
4242

4343
with patch("ipywidgets.Button") as MockButton, patch(
4444
"ipywidgets.Checkbox"

‎src/codeflare_sdk/ray/cluster/cluster.py

+89-8
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,15 @@
5252
import requests
5353

5454
from kubernetes import config
55+
from kubernetes.dynamic import DynamicClient
56+
from kubernetes import client as k8s_client
57+
from kubernetes.client.rest import ApiException
58+
5559
from kubernetes.client.rest import ApiException
5660
import warnings
5761

62+
CF_SDK_FIELD_MANAGER = "codeflare-sdk"
63+
5864

5965
class Cluster:
6066
"""
@@ -84,6 +90,12 @@ def __init__(self, config: ClusterConfiguration):
8490
if is_notebook():
8591
cluster_up_down_buttons(self)
8692

93+
def get_dynamic_client(self): # pragma: no cover
94+
return DynamicClient(get_api_client())
95+
96+
def config_check(self):
97+
return config_check()
98+
8799
@property
88100
def _client_headers(self):
89101
k8_client = get_api_client()
@@ -95,9 +107,7 @@ def _client_headers(self):
95107

96108
@property
97109
def _client_verify_tls(self):
98-
if not _is_openshift_cluster or not self.config.verify_tls:
99-
return False
100-
return True
110+
return _is_openshift_cluster and self.config.verify_tls
101111

102112
@property
103113
def job_client(self):
@@ -121,7 +131,6 @@ def create_resource(self):
121131
Called upon cluster object creation, creates an AppWrapper yaml based on
122132
the specifications of the ClusterConfiguration.
123133
"""
124-
125134
if self.config.namespace is None:
126135
self.config.namespace = get_current_namespace()
127136
if self.config.namespace is None:
@@ -130,7 +139,6 @@ def create_resource(self):
130139
raise TypeError(
131140
f"Namespace {self.config.namespace} is of type {type(self.config.namespace)}. Check your Kubernetes Authentication."
132141
)
133-
134142
return build_ray_cluster(self)
135143

136144
# creates a new cluster with the provided or default spec
@@ -139,10 +147,11 @@ def up(self):
139147
Applies the Cluster yaml, pushing the resource request onto
140148
the Kueue localqueue.
141149
"""
142-
150+
print(
151+
"WARNING: The up() function is planned for deprecation in favor of apply()."
152+
)
143153
# check if RayCluster CustomResourceDefinition exists if not throw RuntimeError
144154
self._throw_for_no_raycluster()
145-
146155
namespace = self.config.namespace
147156

148157
try:
@@ -176,6 +185,54 @@ def up(self):
176185
except Exception as e: # pragma: no cover
177186
return _kube_api_error_handling(e)
178187

188+
# Applies a new cluster with the provided or default spec
189+
def apply(self, force=False):
190+
"""
191+
Applies the Cluster yaml using server-side apply.
192+
If 'force' is set to True, conflicts will be forced.
193+
"""
194+
# check if RayCluster CustomResourceDefinition exists if not throw RuntimeError
195+
self._throw_for_no_raycluster()
196+
namespace = self.config.namespace
197+
name = self.config.name
198+
try:
199+
self.config_check()
200+
api_instance = client.CustomObjectsApi(get_api_client())
201+
crds = self.get_dynamic_client().resources
202+
if self.config.appwrapper:
203+
api_version = "workload.codeflare.dev/v1beta2"
204+
api_instance = crds.get(api_version=api_version, kind="AppWrapper")
205+
# defaulting body to resource_yaml
206+
body = self.resource_yaml
207+
if self.config.write_to_file:
208+
# if write_to_file is True, load the file from AppWrapper yaml and update body
209+
with open(self.resource_yaml) as f:
210+
aw = yaml.load(f, Loader=yaml.FullLoader)
211+
body = aw
212+
api_instance.server_side_apply(
213+
field_manager=CF_SDK_FIELD_MANAGER,
214+
group="workload.codeflare.dev",
215+
version="v1beta2",
216+
namespace=namespace,
217+
plural="appwrappers",
218+
body=body,
219+
force_conflicts=force,
220+
)
221+
print(
222+
f"AppWrapper: '{name}' configuration has successfully been applied"
223+
)
224+
else:
225+
api_version = "ray.io/v1"
226+
api_instance = crds.get(api_version=api_version, kind="RayCluster")
227+
self._component_resources_apply(
228+
namespace=namespace, api_instance=api_instance
229+
)
230+
print(f"Ray Cluster: '{name}' has successfully been applied")
231+
except AttributeError as e:
232+
raise RuntimeError(f"Failed to initialize DynamicClient: {e}")
233+
except Exception as e: # pragma: no cover
234+
return _kube_api_error_handling(e)
235+
179236
def _throw_for_no_raycluster(self):
180237
api_instance = client.CustomObjectsApi(get_api_client())
181238
try:
@@ -204,7 +261,7 @@ def down(self):
204261
resource_name = self.config.name
205262
self._throw_for_no_raycluster()
206263
try:
207-
config_check()
264+
self.config_check()
208265
api_instance = client.CustomObjectsApi(get_api_client())
209266
if self.config.appwrapper:
210267
api_instance.delete_namespaced_custom_object(
@@ -507,6 +564,16 @@ def _component_resources_up(
507564
else:
508565
_create_resources(self.resource_yaml, namespace, api_instance)
509566

567+
def _component_resources_apply(
568+
self, namespace: str, api_instance: client.CustomObjectsApi
569+
):
570+
if self.config.write_to_file:
571+
with open(self.resource_yaml) as f:
572+
ray_cluster = yaml.safe_load(f)
573+
_apply_ray_cluster(ray_cluster, namespace, api_instance)
574+
else:
575+
_apply_ray_cluster(self.resource_yaml, namespace, api_instance)
576+
510577
def _component_resources_down(
511578
self, namespace: str, api_instance: client.CustomObjectsApi
512579
):
@@ -744,6 +811,20 @@ def _create_resources(yamls, namespace: str, api_instance: client.CustomObjectsA
744811
)
745812

746813

814+
def _apply_ray_cluster(
815+
yamls, namespace: str, api_instance: client.CustomObjectsApi, force=False
816+
):
817+
api_instance.server_side_apply(
818+
field_manager=CF_SDK_FIELD_MANAGER,
819+
group="ray.io",
820+
version="v1",
821+
namespace=namespace,
822+
plural="rayclusters",
823+
body=yamls,
824+
force_conflicts=force, # Allow forcing conflicts if needed
825+
)
826+
827+
747828
def _check_aw_exists(name: str, namespace: str) -> bool:
748829
try:
749830
config_check()

‎src/codeflare_sdk/ray/cluster/test_cluster.py

+232-6
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,18 @@
1919
list_all_queued,
2020
)
2121
from codeflare_sdk.common.utils.unit_test_support import (
22-
createClusterWithConfig,
22+
create_cluster,
2323
arg_check_del_effect,
2424
ingress_retrieval,
2525
arg_check_apply_effect,
2626
get_local_queue,
27-
createClusterConfig,
27+
create_cluster_config,
2828
get_ray_obj,
2929
get_obj_none,
3030
get_ray_obj_with_status,
3131
get_aw_obj_with_status,
32+
patch_cluster_with_dynamic_client,
33+
route_list_retrieval,
3234
)
3335
from codeflare_sdk.ray.cluster.cluster import _is_openshift_cluster
3436
from pathlib import Path
@@ -67,11 +69,189 @@ def test_cluster_up_down(mocker):
6769
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
6870
return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"),
6971
)
70-
cluster = cluster = createClusterWithConfig(mocker)
72+
cluster = create_cluster(mocker)
7173
cluster.up()
7274
cluster.down()
7375

7476

77+
def test_cluster_apply_scale_up_scale_down(mocker):
78+
mocker.patch("kubernetes.client.ApisApi.get_api_versions")
79+
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
80+
mock_dynamic_client = mocker.Mock()
81+
mocker.patch(
82+
"kubernetes.dynamic.DynamicClient.resources", new_callable=mocker.PropertyMock
83+
)
84+
mocker.patch(
85+
"codeflare_sdk.ray.cluster.cluster.Cluster.create_resource",
86+
return_value="./tests/test_cluster_yamls/ray/default-ray-cluster.yaml",
87+
)
88+
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
89+
mocker.patch(
90+
"kubernetes.client.CustomObjectsApi.get_cluster_custom_object",
91+
return_value={"spec": {"domain": "apps.cluster.awsroute.org"}},
92+
)
93+
94+
# Initialize test
95+
initial_num_workers = 1
96+
scaled_up_num_workers = 2
97+
98+
# Step 1: Create cluster with initial workers
99+
cluster = create_cluster(mocker, initial_num_workers)
100+
patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client)
101+
mocker.patch(
102+
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
103+
return_value=get_obj_none("ray.io", "v1", "ns", "rayclusters"),
104+
)
105+
cluster.apply()
106+
107+
# Step 2: Scale up the cluster
108+
cluster = create_cluster(mocker, scaled_up_num_workers)
109+
patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client)
110+
cluster.apply()
111+
112+
# Step 3: Scale down the cluster
113+
cluster = create_cluster(mocker, initial_num_workers)
114+
patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client)
115+
cluster.apply()
116+
117+
# Tear down
118+
cluster.down()
119+
120+
121+
def test_cluster_apply_with_file(mocker):
122+
mocker.patch("kubernetes.client.ApisApi.get_api_versions")
123+
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
124+
mock_dynamic_client = mocker.Mock()
125+
mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster")
126+
mocker.patch(
127+
"kubernetes.dynamic.DynamicClient.resources", new_callable=mocker.PropertyMock
128+
)
129+
mocker.patch(
130+
"codeflare_sdk.ray.cluster.cluster.Cluster.create_resource",
131+
return_value="./tests/test_cluster_yamls/ray/default-ray-cluster.yaml",
132+
)
133+
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
134+
mocker.patch(
135+
"kubernetes.client.CustomObjectsApi.get_cluster_custom_object",
136+
return_value={"spec": {"domain": "apps.cluster.awsroute.org"}},
137+
)
138+
139+
# Step 1: Create cluster with initial workers
140+
cluster = create_cluster(mocker, 1, write_to_file=True)
141+
patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client)
142+
mocker.patch(
143+
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
144+
return_value=get_obj_none("ray.io", "v1", "ns", "rayclusters"),
145+
)
146+
cluster.apply()
147+
# Tear down
148+
cluster.down()
149+
150+
151+
def test_cluster_apply_with_appwrapper(mocker):
152+
# Mock Kubernetes client and dynamic client methods
153+
mocker.patch("kubernetes.client.ApisApi.get_api_versions")
154+
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
155+
mocker.patch(
156+
"codeflare_sdk.ray.cluster.cluster._check_aw_exists",
157+
return_value=True,
158+
)
159+
mock_dynamic_client = mocker.Mock()
160+
mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster")
161+
mocker.patch(
162+
"kubernetes.dynamic.DynamicClient.resources", new_callable=mocker.PropertyMock
163+
)
164+
mocker.patch(
165+
"codeflare_sdk.ray.cluster.cluster.Cluster.create_resource",
166+
return_value="./tests/test_cluster_yamls/ray/default-ray-cluster.yaml",
167+
)
168+
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
169+
170+
# Create a cluster configuration with appwrapper set to False
171+
cluster = create_cluster(mocker, 1, write_to_file=False)
172+
patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client)
173+
174+
# Mock listing RayCluster to simulate it doesn't exist
175+
mocker.patch(
176+
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
177+
return_value=get_obj_none("ray.io", "v1", "ns", "rayclusters"),
178+
)
179+
# Call the apply method
180+
cluster.apply()
181+
182+
# Assertions
183+
print("Cluster applied without AppWrapper.")
184+
185+
186+
def test_cluster_apply_without_appwrapper_write_to_file(mocker):
187+
# Mock Kubernetes client and dynamic client methods
188+
mocker.patch("kubernetes.client.ApisApi.get_api_versions")
189+
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
190+
mocker.patch(
191+
"codeflare_sdk.ray.cluster.cluster._check_aw_exists",
192+
return_value=True,
193+
)
194+
mock_dynamic_client = mocker.Mock()
195+
mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster")
196+
mocker.patch(
197+
"kubernetes.dynamic.DynamicClient.resources", new_callable=mocker.PropertyMock
198+
)
199+
mocker.patch(
200+
"codeflare_sdk.ray.cluster.cluster.Cluster.create_resource",
201+
return_value="./tests/test_cluster_yamls/ray/default-ray-cluster.yaml",
202+
)
203+
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
204+
205+
# Create a cluster configuration with appwrapper set to False
206+
cluster = create_cluster(mocker, 1, write_to_file=True)
207+
patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client)
208+
cluster.config.appwrapper = False
209+
210+
# Mock listing RayCluster to simulate it doesn't exist
211+
mocker.patch(
212+
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
213+
return_value=get_obj_none("ray.io", "v1", "ns", "rayclusters"),
214+
)
215+
# Call the apply method
216+
cluster.apply()
217+
218+
# Assertions
219+
print("Cluster applied without AppWrapper.")
220+
221+
222+
def test_cluster_apply_without_appwrapper(mocker):
223+
# Mock Kubernetes client and dynamic client methods
224+
mocker.patch("kubernetes.client.ApisApi.get_api_versions")
225+
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
226+
mock_dynamic_client = mocker.Mock()
227+
mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster")
228+
mocker.patch(
229+
"kubernetes.dynamic.DynamicClient.resources", new_callable=mocker.PropertyMock
230+
)
231+
mocker.patch(
232+
"codeflare_sdk.ray.cluster.cluster.Cluster.create_resource",
233+
return_value="./tests/test_cluster_yamls/ray/default-ray-cluster.yaml",
234+
)
235+
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
236+
237+
# Create a cluster configuration with appwrapper set to False
238+
cluster = create_cluster(mocker, 1, write_to_file=False)
239+
cluster.config.appwrapper = None
240+
patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client)
241+
242+
# Mock listing RayCluster to simulate it doesn't exist
243+
mocker.patch(
244+
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
245+
return_value=get_obj_none("ray.io", "v1", "ns", "rayclusters"),
246+
)
247+
248+
# Call the apply method
249+
cluster.apply()
250+
251+
# Assertions
252+
print("Cluster applied without AppWrapper.")
253+
254+
75255
def test_cluster_up_down_no_mcad(mocker):
76256
mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster")
77257
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
@@ -98,7 +278,7 @@ def test_cluster_up_down_no_mcad(mocker):
98278
"kubernetes.client.CustomObjectsApi.list_cluster_custom_object",
99279
return_value={"items": []},
100280
)
101-
config = createClusterConfig()
281+
config = create_cluster_config()
102282
config.name = "unit-test-cluster-ray"
103283
config.appwrapper = False
104284
cluster = Cluster(config)
@@ -117,7 +297,7 @@ def test_cluster_uris(mocker):
117297
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
118298
return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"),
119299
)
120-
cluster = cluster = createClusterWithConfig(mocker)
300+
cluster = create_cluster(mocker)
121301
mocker.patch(
122302
"kubernetes.client.NetworkingV1Api.list_namespaced_ingress",
123303
return_value=ingress_retrieval(
@@ -147,6 +327,52 @@ def test_cluster_uris(mocker):
147327
== "Dashboard not available yet, have you run cluster.up()?"
148328
)
149329

330+
mocker.patch(
331+
"codeflare_sdk.ray.cluster.cluster._is_openshift_cluster", return_value=True
332+
)
333+
mocker.patch(
334+
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
335+
return_value={
336+
"items": [
337+
{
338+
"metadata": {
339+
"name": "ray-dashboard-unit-test-cluster",
340+
},
341+
"spec": {
342+
"host": "ray-dashboard-unit-test-cluster-ns.apps.cluster.awsroute.org",
343+
"tls": {}, # Indicating HTTPS
344+
},
345+
}
346+
]
347+
},
348+
)
349+
cluster = create_cluster(mocker)
350+
assert (
351+
cluster.cluster_dashboard_uri()
352+
== "http://ray-dashboard-unit-test-cluster-ns.apps.cluster.awsroute.org"
353+
)
354+
mocker.patch(
355+
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
356+
return_value={
357+
"items": [
358+
{
359+
"metadata": {
360+
"name": "ray-dashboard-unit-test-cluster",
361+
},
362+
"spec": {
363+
"host": "ray-dashboard-unit-test-cluster-ns.apps.cluster.awsroute.org",
364+
"tls": {"termination": "passthrough"}, # Indicating HTTPS
365+
},
366+
}
367+
]
368+
},
369+
)
370+
cluster = create_cluster(mocker)
371+
assert (
372+
cluster.cluster_dashboard_uri()
373+
== "https://ray-dashboard-unit-test-cluster-ns.apps.cluster.awsroute.org"
374+
)
375+
150376

151377
def test_ray_job_wrapping(mocker):
152378
import ray
@@ -159,7 +385,7 @@ def ray_addr(self, *args):
159385
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
160386
return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"),
161387
)
162-
cluster = cluster = createClusterWithConfig(mocker)
388+
cluster = create_cluster(mocker)
163389
mocker.patch(
164390
"ray.job_submission.JobSubmissionClient._check_connection_and_version_with_url",
165391
return_value="None",

‎src/codeflare_sdk/ray/cluster/test_config.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414

1515
from codeflare_sdk.common.utils.unit_test_support import (
1616
apply_template,
17-
createClusterWrongType,
1817
get_example_extended_storage_opts,
18+
create_cluster_wrong_type,
1919
create_cluster_all_config_params,
2020
get_template_variables,
2121
)
@@ -59,6 +59,7 @@ def test_default_appwrapper_creation(mocker):
5959
assert cluster.resource_yaml == expected_aw
6060

6161

62+
@pytest.mark.filterwarnings("ignore::UserWarning")
6263
def test_config_creation_all_parameters(mocker):
6364
from codeflare_sdk.ray.cluster.config import DEFAULT_RESOURCE_MAPPING
6465

@@ -110,6 +111,7 @@ def test_config_creation_all_parameters(mocker):
110111
)
111112

112113

114+
@pytest.mark.filterwarnings("ignore::UserWarning")
113115
def test_all_config_params_aw(mocker):
114116
create_cluster_all_config_params(mocker, "aw-all-params", True)
115117
assert filecmp.cmp(
@@ -121,11 +123,12 @@ def test_all_config_params_aw(mocker):
121123

122124
def test_config_creation_wrong_type():
123125
with pytest.raises(TypeError) as error_info:
124-
createClusterWrongType()
126+
create_cluster_wrong_type()
125127

126128
assert len(str(error_info.value).splitlines()) == 4
127129

128130

131+
@pytest.mark.filterwarnings("ignore::UserWarning")
129132
def test_cluster_config_deprecation_conversion(mocker):
130133
config = ClusterConfiguration(
131134
name="test",

‎tests/e2e/cluster_apply_kind_test.py

+156
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
from codeflare_sdk import Cluster, ClusterConfiguration
2+
import pytest
3+
from kubernetes import client
4+
5+
from support import (
6+
initialize_kubernetes_client,
7+
create_namespace,
8+
delete_namespace,
9+
get_ray_cluster,
10+
)
11+
12+
13+
@pytest.mark.kind
14+
class TestRayClusterApply:
15+
def setup_method(self):
16+
initialize_kubernetes_client(self)
17+
18+
def teardown_method(self):
19+
delete_namespace(self)
20+
21+
def test_cluster_apply(self):
22+
self.setup_method()
23+
create_namespace(self)
24+
25+
cluster_name = "test-cluster-apply"
26+
namespace = self.namespace
27+
28+
# Initial configuration with 1 worker
29+
initial_config = ClusterConfiguration(
30+
name=cluster_name,
31+
namespace=namespace,
32+
num_workers=1,
33+
head_cpu_requests="500m",
34+
head_cpu_limits="1",
35+
head_memory_requests="1Gi",
36+
head_memory_limits="2Gi",
37+
worker_cpu_requests="500m",
38+
worker_cpu_limits="1",
39+
worker_memory_requests="1Gi",
40+
worker_memory_limits="2Gi",
41+
write_to_file=True,
42+
verify_tls=False,
43+
)
44+
45+
# Create the cluster
46+
cluster = Cluster(initial_config)
47+
cluster.apply()
48+
49+
# Wait for the cluster to be ready
50+
cluster.wait_ready()
51+
status = cluster.status()
52+
assert status["ready"], f"Cluster {cluster_name} is not ready: {status}"
53+
54+
# Verify the cluster is created
55+
ray_cluster = get_ray_cluster(cluster_name, namespace)
56+
assert ray_cluster is not None, "Cluster was not created successfully"
57+
assert (
58+
ray_cluster["spec"]["workerGroupSpecs"][0]["replicas"] == 1
59+
), "Initial worker count does not match"
60+
61+
# Update configuration with 3 workers
62+
updated_config = ClusterConfiguration(
63+
name=cluster_name,
64+
namespace=namespace,
65+
num_workers=2,
66+
head_cpu_requests="500m",
67+
head_cpu_limits="1",
68+
head_memory_requests="1Gi",
69+
head_memory_limits="2Gi",
70+
worker_cpu_requests="500m",
71+
worker_cpu_limits="1",
72+
worker_memory_requests="1Gi",
73+
worker_memory_limits="2Gi",
74+
write_to_file=True,
75+
verify_tls=False,
76+
)
77+
78+
# Apply the updated configuration
79+
cluster.config = updated_config
80+
cluster.apply()
81+
82+
# Wait for the updated cluster to be ready
83+
cluster.wait_ready()
84+
updated_status = cluster.status()
85+
assert updated_status[
86+
"ready"
87+
], f"Cluster {cluster_name} is not ready after update: {updated_status}"
88+
89+
# Verify the cluster is updated
90+
updated_ray_cluster = get_ray_cluster(cluster_name, namespace)
91+
assert (
92+
updated_ray_cluster["spec"]["workerGroupSpecs"][0]["replicas"] == 2
93+
), "Worker count was not updated"
94+
95+
# Clean up
96+
cluster.down()
97+
ray_cluster = get_ray_cluster(cluster_name, namespace)
98+
assert ray_cluster is None, "Cluster was not deleted successfully"
99+
100+
def test_apply_invalid_update(self):
101+
self.setup_method()
102+
create_namespace(self)
103+
104+
cluster_name = "test-cluster-apply-invalid"
105+
namespace = self.namespace
106+
107+
# Initial configuration
108+
initial_config = ClusterConfiguration(
109+
name=cluster_name,
110+
namespace=namespace,
111+
num_workers=1,
112+
head_cpu_requests="500m",
113+
head_cpu_limits="1",
114+
head_memory_requests="1Gi",
115+
head_memory_limits="2Gi",
116+
worker_cpu_requests="500m",
117+
worker_cpu_limits="1",
118+
worker_memory_requests="1Gi",
119+
worker_memory_limits="2Gi",
120+
write_to_file=True,
121+
verify_tls=False,
122+
)
123+
124+
# Create the cluster
125+
cluster = Cluster(initial_config)
126+
cluster.apply()
127+
128+
# Wait for the cluster to be ready
129+
cluster.wait_ready()
130+
status = cluster.status()
131+
assert status["ready"], f"Cluster {cluster_name} is not ready: {status}"
132+
133+
# Update with an invalid configuration (e.g., immutable field change)
134+
invalid_config = ClusterConfiguration(
135+
name=cluster_name,
136+
namespace=namespace,
137+
num_workers=2,
138+
head_cpu_requests="1",
139+
head_cpu_limits="2", # Changing CPU limits (immutable)
140+
head_memory_requests="1Gi",
141+
head_memory_limits="2Gi",
142+
worker_cpu_requests="500m",
143+
worker_cpu_limits="1",
144+
worker_memory_requests="1Gi",
145+
worker_memory_limits="2Gi",
146+
write_to_file=True,
147+
verify_tls=False,
148+
)
149+
150+
# Try to apply the invalid configuration and expect failure
151+
cluster.config = invalid_config
152+
with pytest.raises(RuntimeError, match="Immutable fields detected"):
153+
cluster.apply()
154+
155+
# Clean up
156+
cluster.down()

‎tests/e2e/support.py

+16
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,22 @@
1111
)
1212

1313

14+
def get_ray_cluster(cluster_name, namespace):
15+
api = client.CustomObjectsApi()
16+
try:
17+
return api.get_namespaced_custom_object(
18+
group="ray.io",
19+
version="v1",
20+
namespace=namespace,
21+
plural="rayclusters",
22+
name=cluster_name,
23+
)
24+
except client.exceptions.ApiException as e:
25+
if e.status == 404:
26+
return None
27+
raise
28+
29+
1430
def get_ray_image():
1531
default_ray_image = "quay.io/modh/ray@sha256:0d715f92570a2997381b7cafc0e224cfa25323f18b9545acfd23bc2b71576d06"
1632
return os.getenv("RAY_IMAGE", default_ray_image)

0 commit comments

Comments
 (0)
Please sign in to comment.