1
+ import pytest
2
+ from codeflare_sdk .ray .job .job import RayJobSpec , RayJob , RayJobStatus
3
+
4
+
5
+ def test_ray_job_spec_creation ():
6
+ """Test RayJobSpec creation with all fields."""
7
+ spec = RayJobSpec (
8
+ entrypoint = "python script.py" ,
9
+ submission_id = "test-123" ,
10
+ runtime_env = {"pip" : ["numpy" , "pandas" ]},
11
+ metadata = {"author" : "test" },
12
+ entrypoint_num_cpus = 2.0 ,
13
+ entrypoint_num_gpus = 1.0 ,
14
+ entrypoint_memory = 1024 ,
15
+ entrypoint_resources = {"custom.com/resource" : 1.0 },
16
+ cluster_name = "test-cluster" ,
17
+ cluster_namespace = "test-ns"
18
+ )
19
+
20
+ assert spec .entrypoint == "python script.py"
21
+ assert spec .submission_id == "test-123"
22
+ assert spec .runtime_env == {"pip" : ["numpy" , "pandas" ]}
23
+ assert spec .metadata == {"author" : "test" }
24
+ assert spec .entrypoint_num_cpus == 2.0
25
+ assert spec .entrypoint_num_gpus == 1.0
26
+ assert spec .entrypoint_memory == 1024
27
+ assert spec .entrypoint_resources == {"custom.com/resource" : 1.0 }
28
+ assert spec .cluster_name == "test-cluster"
29
+ assert spec .cluster_namespace == "test-ns"
30
+ assert spec .status == RayJobStatus .PENDING
31
+
32
+
33
+ def test_ray_job_yaml_generation_full ():
34
+ """Test RayJob YAML generation with all fields."""
35
+ spec = RayJobSpec (
36
+ entrypoint = "python -c 'import ray; print(ray.cluster_resources())'" ,
37
+ submission_id = "test-submission-456" ,
38
+ runtime_env = {"pip" : ["numpy==1.24.0" , "pandas" ], "env_vars" : {"RAY_LOG_LEVEL" : "DEBUG" }},
39
+ metadata = {"job_timeout_s" : "1800" , "author" : "test-user" },
40
+ entrypoint_num_cpus = 2.5 ,
41
+ entrypoint_num_gpus = 1.0 ,
42
+ entrypoint_memory = 2048 ,
43
+ entrypoint_resources = {"custom.com/special" : 1.0 },
44
+ cluster_name = "ml-cluster" ,
45
+ cluster_namespace = "ml-namespace"
46
+ )
47
+
48
+ job = RayJob (
49
+ metadata = {
50
+ "name" : "comprehensive-test-job" ,
51
+ "namespace" : "test-namespace" ,
52
+ "labels" : {"app" : "ml-training" }
53
+ },
54
+ spec = spec
55
+ )
56
+
57
+ yaml_dict = job .to_dict ()
58
+
59
+ # Verify top-level structure
60
+ assert yaml_dict ["apiVersion" ] == "ray.io/v1"
61
+ assert yaml_dict ["kind" ] == "RayJob"
62
+ assert yaml_dict ["metadata" ]["name" ] == "comprehensive-test-job"
63
+ assert yaml_dict ["metadata" ]["namespace" ] == "test-namespace"
64
+ assert yaml_dict ["metadata" ]["labels" ] == {"app" : "ml-training" }
65
+
66
+ # Verify spec section
67
+ spec_dict = yaml_dict ["spec" ]
68
+ assert spec_dict ["entrypoint" ] == "python -c 'import ray; print(ray.cluster_resources())'"
69
+ assert spec_dict ["submission_id" ] == "test-submission-456"
70
+ assert spec_dict ["runtime_env" ] == {"pip" : ["numpy==1.24.0" , "pandas" ], "env_vars" : {"RAY_LOG_LEVEL" : "DEBUG" }}
71
+ assert spec_dict ["metadata" ] == {"job_timeout_s" : "1800" , "author" : "test-user" }
72
+ assert spec_dict ["entrypoint_num_cpus" ] == 2.5
73
+ assert spec_dict ["entrypoint_num_gpus" ] == 1.0
74
+ assert spec_dict ["entrypoint_memory" ] == 2048
75
+ assert spec_dict ["entrypoint_resources" ] == {"custom.com/special" : 1.0 }
76
+ assert spec_dict ["cluster_name" ] == "ml-cluster"
77
+ assert spec_dict ["cluster_namespace" ] == "ml-namespace"
78
+
79
+ # Verify status section (should use spec status when job.status is None)
80
+ status_dict = yaml_dict ["status" ]
81
+ assert status_dict ["status" ] == RayJobStatus .PENDING
82
+ assert status_dict ["message" ] is None
83
+ assert status_dict ["start_time" ] is None
84
+ assert status_dict ["end_time" ] is None
85
+ assert status_dict ["driver_info" ] is None
86
+
87
+
88
+ def test_ray_job_yaml_generation_minimal ():
89
+ """Test RayJob YAML generation with minimal required fields."""
90
+ spec = RayJobSpec (entrypoint = "python minimal_job.py" )
91
+
92
+ job = RayJob (
93
+ metadata = {"name" : "minimal-job" },
94
+ spec = spec
95
+ )
96
+
97
+ yaml_dict = job .to_dict ()
98
+
99
+ # Verify structure
100
+ assert yaml_dict ["apiVersion" ] == "ray.io/v1"
101
+ assert yaml_dict ["kind" ] == "RayJob"
102
+ assert yaml_dict ["metadata" ]["name" ] == "minimal-job"
103
+
104
+ # Verify spec has only required field and defaults to None for others
105
+ spec_dict = yaml_dict ["spec" ]
106
+ assert spec_dict ["entrypoint" ] == "python minimal_job.py"
107
+ assert spec_dict ["submission_id" ] is None
108
+ assert spec_dict ["runtime_env" ] is None
109
+ assert spec_dict ["metadata" ] is None
110
+ assert spec_dict ["entrypoint_num_cpus" ] is None
111
+ assert spec_dict ["entrypoint_num_gpus" ] is None
112
+ assert spec_dict ["entrypoint_memory" ] is None
113
+ assert spec_dict ["entrypoint_resources" ] is None
114
+ assert spec_dict ["cluster_name" ] is None
115
+ assert spec_dict ["cluster_namespace" ] is None
116
+
117
+ # Verify default status
118
+ status_dict = yaml_dict ["status" ]
119
+ assert status_dict ["status" ] == RayJobStatus .PENDING
120
+
121
+
122
+ def test_ray_job_yaml_with_existing_status ():
123
+ """Test RayJob YAML generation when status is pre-populated."""
124
+ spec = RayJobSpec (
125
+ entrypoint = "python running_job.py" ,
126
+ status = RayJobStatus .PENDING # This should be overridden by job.status
127
+ )
128
+
129
+ # Simulate status from Kubernetes controller
130
+ existing_status = {
131
+ "status" : "RUNNING" ,
132
+ "message" : "Job is executing on cluster" ,
133
+ "start_time" : "2023-12-01T10:30:00Z" ,
134
+ "end_time" : None ,
135
+ "driver_info" : {
136
+ "id" : "driver-abc123" ,
137
+ "node_ip_address" : "10.244.1.5" ,
138
+ "pid" : "12345"
139
+ }
140
+ }
141
+
142
+ job = RayJob (
143
+ metadata = {"name" : "status-test-job" , "namespace" : "test-ns" },
144
+ spec = spec ,
145
+ status = existing_status
146
+ )
147
+
148
+ yaml_dict = job .to_dict ()
149
+
150
+ # Should use existing status, not spec status
151
+ assert yaml_dict ["status" ] == existing_status
152
+ assert yaml_dict ["status" ]["status" ] == "RUNNING"
153
+ assert yaml_dict ["status" ]["message" ] == "Job is executing on cluster"
154
+ assert yaml_dict ["status" ]["start_time" ] == "2023-12-01T10:30:00Z"
155
+ assert yaml_dict ["status" ]["driver_info" ]["id" ] == "driver-abc123"
156
+
157
+
158
+ def test_ray_job_yaml_with_complex_runtime_env ():
159
+ """Test RayJob YAML generation with complex runtime environment."""
160
+ complex_runtime_env = {
161
+ "pip" : ["torch==1.13.0" , "transformers" , "datasets" ],
162
+ "conda" : {"dependencies" : ["python=3.9" , "cudatoolkit=11.8" ]},
163
+ "env_vars" : {
164
+ "CUDA_VISIBLE_DEVICES" : "0,1" ,
165
+ "PYTHONPATH" : "/opt/ml/code" ,
166
+ "HF_HOME" : "/tmp/huggingface"
167
+ },
168
+ "working_dir" : "./training_code" ,
169
+ "py_modules" : ["utils" , "models" ]
170
+ }
171
+
172
+ spec = RayJobSpec (
173
+ entrypoint = "python train_model.py --epochs 100" ,
174
+ runtime_env = complex_runtime_env ,
175
+ entrypoint_num_gpus = 2.0 ,
176
+ entrypoint_memory = 8192
177
+ )
178
+
179
+ job = RayJob (
180
+ metadata = {"name" : "complex-env-job" , "namespace" : "ml-training" },
181
+ spec = spec
182
+ )
183
+
184
+ yaml_dict = job .to_dict ()
185
+
186
+ # Verify complex runtime_env is preserved exactly
187
+ spec_runtime_env = yaml_dict ["spec" ]["runtime_env" ]
188
+ assert spec_runtime_env ["pip" ] == ["torch==1.13.0" , "transformers" , "datasets" ]
189
+ assert spec_runtime_env ["conda" ]["dependencies" ] == ["python=3.9" , "cudatoolkit=11.8" ]
190
+ assert spec_runtime_env ["env_vars" ]["CUDA_VISIBLE_DEVICES" ] == "0,1"
191
+ assert spec_runtime_env ["env_vars" ]["PYTHONPATH" ] == "/opt/ml/code"
192
+ assert spec_runtime_env ["env_vars" ]["HF_HOME" ] == "/tmp/huggingface"
193
+ assert spec_runtime_env ["working_dir" ] == "./training_code"
194
+ assert spec_runtime_env ["py_modules" ] == ["utils" , "models" ]
195
+
196
+ # Verify other fields
197
+ assert yaml_dict ["spec" ]["entrypoint" ] == "python train_model.py --epochs 100"
198
+ assert yaml_dict ["spec" ]["entrypoint_num_gpus" ] == 2.0
199
+ assert yaml_dict ["spec" ]["entrypoint_memory" ] == 8192
200
+
201
+
202
+ def test_ray_job_yaml_different_statuses ():
203
+ """Test RayJob YAML generation with different status values."""
204
+ statuses_to_test = [
205
+ RayJobStatus .PENDING ,
206
+ RayJobStatus .RUNNING ,
207
+ RayJobStatus .SUCCEEDED ,
208
+ RayJobStatus .FAILED ,
209
+ RayJobStatus .STOPPED
210
+ ]
211
+
212
+ for status in statuses_to_test :
213
+ spec = RayJobSpec (
214
+ entrypoint = f"python job_{ status .lower ()} .py" ,
215
+ status = status
216
+ )
217
+
218
+ job = RayJob (
219
+ metadata = {"name" : f"job-{ status .lower ()} " , "namespace" : "test" },
220
+ spec = spec
221
+ )
222
+
223
+ yaml_dict = job .to_dict ()
224
+ assert yaml_dict ["status" ]["status" ] == status
225
+ assert yaml_dict ["spec" ]["entrypoint" ] == f"python job_{ status .lower ()} .py"
0 commit comments