Skip to content

Commit 4739884

Browse files
committed
Refactoring: Organize functions from core.py into classes
1 parent c7b4958 commit 4739884

33 files changed

+3043
-2984
lines changed

src/xpk/commands/batch.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,24 @@
1414
limitations under the License.
1515
"""
1616

17+
import re
1718
from argparse import Namespace
1819

19-
from ..core.cluster import setup_k8s_env, create_k8s_service_account
20+
from ..core.cluster import ClusterManager
2021
from ..core.commands import run_command_for_value
21-
from ..core.config import GCS_FUSE_ANNOTATION_KEY, GCS_FUSE_ANNOTATION_VALUE, XPK_SA, DEFAULT_NAMESPACE
22-
from ..core.gcloud_context import add_zone_and_project
22+
from ..core.config import (
23+
DEFAULT_NAMESPACE,
24+
GCS_FUSE_ANNOTATION_KEY,
25+
GCS_FUSE_ANNOTATION_VALUE,
26+
XPK_SA,
27+
)
28+
from ..core.gcloud_context import GCloudContextManager
29+
from ..core.kjob import AppProfileDefaults, Kueue_TAS_annotation, prepare_kjob
2330
from ..core.kueue import LOCAL_QUEUE_NAME
2431
from ..core.storage import get_auto_mount_gcsfuse_storages
2532
from ..utils.console import xpk_exit, xpk_print
2633
from .common import set_cluster_command
27-
from ..core.kjob import AppProfileDefaults, prepare_kjob, Kueue_TAS_annotation
2834
from .kind import set_local_cluster_command
29-
import re
3035

3136

3237
def batch(args: Namespace) -> None:
@@ -38,7 +43,7 @@ def batch(args: Namespace) -> None:
3843
None
3944
"""
4045
if not args.kind_cluster:
41-
add_zone_and_project(args)
46+
GCloudContextManager.add_zone_and_project(args)
4247
set_cluster_command_code = set_cluster_command(args)
4348
else:
4449
set_cluster_command_code = set_local_cluster_command(args)
@@ -54,8 +59,9 @@ def batch(args: Namespace) -> None:
5459

5560

5661
def submit_job(args: Namespace) -> None:
57-
k8s_api_client = setup_k8s_env(args)
58-
create_k8s_service_account(XPK_SA, DEFAULT_NAMESPACE)
62+
cluster_manager = ClusterManager(args, None)
63+
k8s_api_client = cluster_manager.setup_k8s_env()
64+
cluster_manager.create_k8s_service_account(XPK_SA, DEFAULT_NAMESPACE)
5965
gcs_fuse_storages = get_auto_mount_gcsfuse_storages(k8s_api_client)
6066

6167
cmd = (

src/xpk/commands/cluster.py

Lines changed: 62 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,12 @@
1616

1717
from tabulate import tabulate
1818

19-
from ..core.capacity import H100_DEVICE_TYPE
20-
from ..core.cluster import (
21-
get_all_clusters_programmatic,
22-
get_cluster_credentials,
23-
install_nccl_on_cluster,
24-
set_jobset_on_cluster,
25-
setup_k8s_env,
26-
update_cluster_with_gcsfuse_driver_if_necessary,
27-
update_cluster_with_workload_identity_if_necessary,
28-
)
19+
from ..core.capacity import CapacityManager, DeviceType
20+
from ..core.cluster import ClusterManager
2921
from ..core.cluster_private import authorize_private_cluster_access_if_necessary
3022
from ..core.commands import run_command_for_value, run_command_with_updates
3123
from ..core.config import VERTEX_TENSORBOARD_FEATURE_FLAG
32-
from ..core.gcloud_context import (
33-
add_zone_and_project,
34-
get_gke_control_plane_version,
35-
get_gke_server_config,
36-
zone_to_region,
37-
)
24+
from ..core.gcloud_context import GCloudContextManager, GKEVersionManager
3825
from ..core.kjob import apply_kjob_crds, prepare_kjob, verify_kjob_installed
3926
from ..core.kueue import (
4027
cluster_preheat_yml,
@@ -43,28 +30,23 @@
4330
wait_for_kueue_available,
4431
)
4532
from ..core.nap import enable_autoprovisioning_on_cluster
46-
from ..core.network import (
47-
create_cluster_network_config,
48-
delete_cluster_subnets,
49-
set_up_cluster_network_for_gpu,
50-
)
51-
from ..core.nodepool import get_gke_node_pool_version, run_gke_node_pool_create_command
33+
from ..core.network import ClusterNetworkManager
34+
from ..core.nodepool import NodePoolManager
5235
from ..core.ray import install_ray_cluster
53-
from ..core.resources import create_cluster_configmaps
36+
from ..core.resources import ResourceManager
5437
from ..core.storage import install_storage_crd
5538
from ..core.system_characteristics import (
5639
AcceleratorType,
5740
AcceleratorTypeToAcceleratorCharacteristics,
5841
SystemCharacteristics,
5942
get_system_characteristics,
6043
)
61-
from ..core.vertex import create_vertex_tensorboard
44+
from ..core.vertex import VertexAI
6245
from ..core.workload import get_workload_list
6346
from ..utils.console import get_user_input, xpk_exit, xpk_print
6447
from ..utils.file import write_tmp_file
6548
from . import cluster_gcluster
6649
from .common import set_cluster_command
67-
from ..core.cluster import update_cluster_with_gcpfilestore_driver_if_necessary
6850

6951

7052
def cluster_create(args) -> None:
@@ -77,13 +59,25 @@ def cluster_create(args) -> None:
7759
0 if successful and 1 otherwise.
7860
"""
7961
system, return_code = get_system_characteristics(args)
80-
8162
if return_code > 0:
8263
xpk_print('Fetching system characteristics failed!')
8364
xpk_exit(return_code)
8465

8566
xpk_print(f'Starting cluster create for cluster {args.cluster}:', flush=True)
86-
add_zone_and_project(args)
67+
GCloudContextManager.add_zone_and_project(args)
68+
cluster_manager = ClusterManager(args, system)
69+
capacity_manager = CapacityManager(args)
70+
resource_manager = ResourceManager(args, capacity_manager, system)
71+
nodepools_manager = NodePoolManager(
72+
args, system, resource_manager, capacity_manager
73+
)
74+
network_manager = ClusterNetworkManager(args)
75+
vertex_ai = VertexAI(args, resource_manager)
76+
try:
77+
version_manager = GKEVersionManager(args)
78+
except RuntimeError as e:
79+
xpk_print(e)
80+
xpk_exit(1)
8781

8882
if system.device_type in cluster_gcluster.supported_device_types:
8983
xpk_print(
@@ -93,12 +87,8 @@ def cluster_create(args) -> None:
9387
cluster_gcluster.cluster_create(args)
9488
xpk_exit(0)
9589

96-
return_code, gke_server_config = get_gke_server_config(args)
97-
if return_code != 0:
98-
xpk_exit(return_code)
99-
100-
return_code, gke_control_plane_version = get_gke_control_plane_version(
101-
args, gke_server_config
90+
return_code, gke_control_plane_version = (
91+
version_manager.get_gke_control_plane_version()
10292
)
10393
if return_code != 0:
10494
xpk_exit(return_code)
@@ -123,60 +113,61 @@ def cluster_create(args) -> None:
123113
or args.enable_gcpfilestore_csi_driver
124114
):
125115
update_cluster_command_code = (
126-
update_cluster_with_workload_identity_if_necessary(args)
116+
cluster_manager.update_cluster_with_workload_identity_if_necessary()
127117
)
128118
if update_cluster_command_code != 0:
129119
xpk_exit(update_cluster_command_code)
130120

131121
# Enable GCSFuse CSI Driver if not enabled already.
132122
if args.enable_gcsfuse_csi_driver:
133123
update_cluster_command_code = (
134-
update_cluster_with_gcsfuse_driver_if_necessary(args)
124+
cluster_manager.update_cluster_with_gcsfuse_driver_if_necessary()
135125
)
136126
if update_cluster_command_code != 0:
137127
xpk_exit(update_cluster_command_code)
138128

139129
if args.enable_gcpfilestore_csi_driver:
140130
update_cluster_command_code = (
141-
update_cluster_with_gcpfilestore_driver_if_necessary(args)
131+
cluster_manager.update_cluster_with_gcpfilestore_driver_if_necessary()
142132
)
143133
if update_cluster_command_code != 0:
144134
xpk_exit(update_cluster_command_code)
145135

146136
# Update Pathways clusters with CloudDNS if not enabled already.
147137

148-
get_cluster_credentials(args)
138+
cluster_manager.get_cluster_credentials()
149139

150140
# create Vertex Tensorboard for new and existing clusters if create-vertex-tensorboard is set
151141
tensorboard_config = {}
152142
if VERTEX_TENSORBOARD_FEATURE_FLAG and args.create_vertex_tensorboard:
153-
tensorboard_config = create_vertex_tensorboard(args)
143+
tensorboard_config = vertex_ai.create_vertex_tensorboard()
154144
# exit if failed to create Tensorboard in Vertex AI
155145
if not tensorboard_config:
156146
xpk_exit(1)
157147

158148
if system.accelerator_type == AcceleratorType['GPU']:
159149
xpk_print('Setting up Network for cluster')
160-
set_up_cluster_network_code = set_up_cluster_network_for_gpu(args, system)
150+
set_up_cluster_network_code = network_manager.set_up_network_for_gpu(system)
161151
if set_up_cluster_network_code != 0:
162152
xpk_exit(set_up_cluster_network_code)
163153

164-
if system.device_type == H100_DEVICE_TYPE:
154+
if system.device_type == DeviceType.H100.value:
165155
xpk_print('Creating Network Config for cluster')
166-
create_cluster_network_config_code = create_cluster_network_config(args)
156+
create_cluster_network_config_code = network_manager.create_network_config()
167157
if create_cluster_network_config_code != 0:
168158
xpk_exit(create_cluster_network_config_code)
169159

170160
# Check the control plane version of the cluster and determine the node pool
171161
# version to use.
172-
return_code, gke_node_pool_version = get_gke_node_pool_version(
173-
args, gke_server_config
162+
163+
return_code, gke_node_pool_version = (
164+
nodepools_manager.get_gke_node_pool_version(version_manager)
174165
)
175166
if return_code != 0:
176167
xpk_exit(return_code)
177168

178-
run_gke_node_pool_create_command_code = run_gke_node_pool_create_command(
179-
args, system, gke_node_pool_version
169+
run_gke_node_pool_create_command_code = (
170+
nodepools_manager.run_gke_node_pool_create_command(gke_node_pool_version)
180171
)
181172
if run_gke_node_pool_create_command_code != 0:
182173
xpk_exit(run_gke_node_pool_create_command_code)
@@ -193,8 +184,8 @@ def cluster_create(args) -> None:
193184
xpk_exit(return_code)
194185

195186
xpk_print('Creating ConfigMap for cluster')
196-
create_cluster_configmaps_code = create_cluster_configmaps(
197-
args, system, tensorboard_config, autoprovisioning_config
187+
create_cluster_configmaps_code = resource_manager.create_cluster_configmaps(
188+
tensorboard_config, autoprovisioning_config
198189
)
199190
if create_cluster_configmaps_code != 0:
200191
xpk_exit(create_cluster_configmaps_code)
@@ -203,7 +194,7 @@ def cluster_create(args) -> None:
203194
'Enabling the jobset API on our cluster, to be deprecated when Jobset is'
204195
' globally available'
205196
)
206-
set_jobset_on_cluster_code = set_jobset_on_cluster(args)
197+
set_jobset_on_cluster_code = cluster_manager.set_jobset_on_cluster()
207198
if set_jobset_on_cluster_code != 0:
208199
xpk_exit(set_jobset_on_cluster_code)
209200

@@ -226,7 +217,7 @@ def cluster_create(args) -> None:
226217
if err_code > 0:
227218
xpk_exit(err_code)
228219

229-
k8s_client = setup_k8s_env(args)
220+
k8s_client = cluster_manager.setup_k8s_env()
230221
install_storage_crd(k8s_client)
231222

232223
xpk_print('Wait for Kueue to be fully available')
@@ -243,7 +234,7 @@ def cluster_create(args) -> None:
243234

244235
if system.accelerator_type == AcceleratorType['GPU']:
245236
xpk_print('Installing NCCL Plugin for cluster')
246-
install_nccl_code = install_nccl_on_cluster(args, system)
237+
install_nccl_code = cluster_manager.install_nccl_on_cluster()
247238
if install_nccl_code != 0:
248239
xpk_exit(install_nccl_code)
249240

@@ -257,7 +248,7 @@ def cluster_create(args) -> None:
257248
xpk_print(
258249
'See your GKE Cluster here:'
259250
# pylint: disable=line-too-long
260-
f' https://console.cloud.google.com/kubernetes/clusters/details/{zone_to_region(args.zone)}/{args.cluster}/details?project={args.project}'
251+
f' https://console.cloud.google.com/kubernetes/clusters/details/{GCloudContextManager.zone_to_region(args.zone)}/{args.cluster}/details?project={args.project}'
261252
)
262253
xpk_exit(0)
263254

@@ -272,7 +263,7 @@ def cluster_delete(args) -> None:
272263
0 if successful and 1 otherwise.
273264
"""
274265
xpk_print(f'Starting cluster delete for cluster: {args.cluster}', flush=True)
275-
add_zone_and_project(args)
266+
GCloudContextManager.add_zone_and_project(args)
276267

277268
if cluster_gcluster.created_by_gcluster(args):
278269
xpk_print(f'Deleting {args.cluster} cluster using Cluster Toolkit...')
@@ -303,14 +294,13 @@ def cluster_cacheimage(args) -> None:
303294
xpk_print(
304295
f'Starting cluster cacheimage for cluster: {args.cluster}', flush=True
305296
)
306-
add_zone_and_project(args)
307-
308-
get_cluster_credentials(args)
297+
GCloudContextManager.add_zone_and_project(args)
309298
system, return_code = get_system_characteristics(args)
310-
311299
if return_code > 0:
312300
xpk_print('Fetching system characteristics failed!')
313301
xpk_exit(return_code)
302+
cluster_manager = ClusterManager(args, system)
303+
cluster_manager.get_cluster_credentials()
314304

315305
node_selector_key = AcceleratorTypeToAcceleratorCharacteristics[
316306
system.accelerator_type
@@ -352,9 +342,13 @@ def cluster_describe(args) -> None:
352342
0 if successful and 1 otherwise.
353343
"""
354344
xpk_print(f'Starting nodepool list for cluster: {args.cluster}', flush=True)
355-
add_zone_and_project(args)
356-
357-
get_cluster_credentials(args)
345+
GCloudContextManager.add_zone_and_project(args)
346+
system, return_code = get_system_characteristics(args)
347+
if return_code > 0:
348+
xpk_print('Fetching system characteristics failed!')
349+
xpk_exit(return_code)
350+
cluster_manager = ClusterManager(args, system)
351+
cluster_manager.get_cluster_credentials()
358352

359353
return_code, data_table = nodepools_build_table(args)
360354
if return_code != 0:
@@ -583,7 +577,7 @@ def cluster_list(args) -> None:
583577
Returns:
584578
0 if successful and 1 otherwise.
585579
"""
586-
add_zone_and_project(args)
580+
GCloudContextManager.add_zone_and_project(args)
587581
xpk_print(f'For project {args.project} and zone {args.zone}:', flush=True)
588582
if run_gke_clusters_list_command(args):
589583
xpk_exit(1)
@@ -631,7 +625,8 @@ def create_cluster_if_necessary(
631625
Returns:
632626
0 if successful and 1 otherwise.
633627
"""
634-
all_clusters, return_code = get_all_clusters_programmatic(args)
628+
cluster_manager = ClusterManager(args, system)
629+
all_clusters, return_code = cluster_manager.get_all_clusters_programmatic()
635630
if return_code > 0:
636631
xpk_print('Listing all clusters failed!')
637632
return 1
@@ -653,6 +648,8 @@ def run_gke_cluster_delete_command(args) -> int:
653648
Returns:
654649
0 if successful and 1 otherwise.
655650
"""
651+
network_manager = ClusterNetworkManager(args)
652+
656653
if not args.force:
657654
xpk_print('Get the name of the workloads in the cluster.')
658655
args.filter_by_status = 'EVERYTHING'
@@ -675,15 +672,15 @@ def run_gke_cluster_delete_command(args) -> int:
675672
command = (
676673
'gcloud beta container clusters delete'
677674
f' {args.cluster} --project={args.project}'
678-
f' --region={zone_to_region(args.zone)} --quiet'
675+
f' --region={GCloudContextManager.zone_to_region(args.zone)} --quiet'
679676
)
680677

681678
return_code = run_command_with_updates(command, 'Cluster Delete', args)
682679
if return_code != 0:
683680
xpk_print(f'Cluster delete request returned ERROR {return_code}')
684681
return 1
685682

686-
return_code = delete_cluster_subnets(args)
683+
return_code = network_manager.delete_subnets()
687684
if return_code != 0:
688685
return return_code
689686

@@ -701,7 +698,7 @@ def run_gke_clusters_list_command(args) -> int:
701698
"""
702699
command = (
703700
'gcloud container clusters list'
704-
f' --project={args.project} --region={zone_to_region(args.zone)}'
701+
f' --project={args.project} --region={GCloudContextManager.zone_to_region(args.zone)}'
705702
)
706703
return_code = run_command_with_updates(command, 'Cluster List', args)
707704
if return_code != 0:
@@ -748,7 +745,7 @@ def run_gke_cluster_create_command(
748745
command = (
749746
'gcloud beta container clusters create'
750747
f' {args.cluster} --project={args.project}'
751-
f' --region={zone_to_region(args.zone)}'
748+
f' --region={GCloudContextManager.zone_to_region(args.zone)}'
752749
f' --node-locations={args.zone}'
753750
f' --cluster-version={gke_control_plane_version}'
754751
f' --machine-type={machine_type}'

0 commit comments

Comments
 (0)