Skip to content

Fix autoprovisioning with spot nodes #187

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/xpk/commands/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from ..core.kueue import LOCAL_QUEUE_NAME
from ..core.nap import (
get_autoprovisioning_node_selector_args,
get_autoprovisioning_tolerations,
is_autoprovisioning_enabled,
)
from ..core.pathways import (
Expand Down Expand Up @@ -101,6 +102,8 @@
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
terminationGracePeriodSeconds: {args.termination_grace_period_seconds}
tolerations:
{autoprovisioning_tolerations}
containers:
{container}
volumes:
Expand Down Expand Up @@ -395,6 +398,7 @@ def workload_create(args) -> None:

# Currently autoprovisioning is not enabled for Pathways workloads.
autoprovisioning_args = ''
autoprovisioning_tolerations = ''
autoprovisioning_enabled, return_code = is_autoprovisioning_enabled(
args, system
)
Expand All @@ -407,6 +411,11 @@ def workload_create(args) -> None:
)
if return_code != 0:
xpk_exit(return_code)
autoprovisioning_tolerations, return_code = (
get_autoprovisioning_tolerations(args)
)
if return_code != 0:
xpk_exit(return_code)

# Create the workload file based on accelerator type or workload type.
if system.accelerator_type == AcceleratorType['GPU']:
Expand Down Expand Up @@ -467,6 +476,7 @@ def workload_create(args) -> None:
local_queue_name=LOCAL_QUEUE_NAME,
autoprovisioning_args=autoprovisioning_args,
volumes=get_volumes(args, system),
autoprovisioning_tolerations=autoprovisioning_tolerations,
)
tmp = write_tmp_file(yml_string)
command = f'kubectl apply -f {str(tmp.file.name)}'
Expand Down
2 changes: 1 addition & 1 deletion src/xpk/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ def get_capacity_node_selectors_from_capacity_type(
case CapacityType.ON_DEMAND.name:
node_selector = ''
case CapacityType.SPOT.name:
node_selector = 'cloud.google.com/gke-spot="true"'
node_selector = 'cloud.google.com/gke-spot: "true"'
case CapacityType.RESERVATION.name:
node_selector = f'cloud.google.com/reservation-name: {args.reservation}'
case _:
Expand Down
129 changes: 97 additions & 32 deletions src/xpk/core/nap.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,49 @@ def is_autoprovisioning_enabled(
return False, 1


def get_capacity_type_str_from_args_or_cluster_default(args) -> tuple[str, int]:
"""Determine the capacity type based on user arguments or cluster default.

Args:
args: user provided arguments for running the command.

Returns:
Tuple with string with the system characteristics and
int of 0 if successful and 1 otherwise.
"""
# If the user doesn't specify args, then use the cluster settings.
capacity_type, return_code = get_capacity_type(args)
if return_code != 0:
xpk_print('Unable to get capacity type.')
return CapacityType.UNKNOWN.name, return_code

if capacity_type != CapacityType.UNKNOWN:
return capacity_type.name, 0

# Use default settings from cluster creation.
#
# Error out if the metadata config map doesn't exist, and is attempting to use
# autoprovisioning.
cluster_config_map = get_cluster_configmap(
args, f'{args.cluster}-{CLUSTER_METADATA_CONFIGMAP}'
)
if cluster_config_map is None:
xpk_print(
'Unable to find config map. Please specify a capacity type'
' --on-demand, --spot, --reservation=$RESERVATION_ID) to continue'
' to use autoprovisioning (--enable-autoprovisioning).'
)
return CapacityType.UNKNOWN.name, 1

return_code, capacity_type_str = get_value_from_map(
CAPACITY_TYPE_CONFIG_KEY, cluster_config_map
)
if return_code != 0:
return CapacityType.UNKNOWN.name, return_code

return capacity_type_str, 0


def get_autoprovisioning_node_selector_args(args) -> tuple[str, int]:
"""Determine the capacity type when autoprovisioning is enabled.

Expand All @@ -297,44 +340,33 @@ def get_autoprovisioning_node_selector_args(args) -> tuple[str, int]:
"""
return_code = 0
node_selector_args = ''
# If the user doesn't specify args, then use the cluster settings.
capacity_type, return_code = get_capacity_type(args)
capacity_type_str = capacity_type.name
capacity_type_str, return_code = (
get_capacity_type_str_from_args_or_cluster_default(args)
)
if return_code != 0:
xpk_print('Unable to get capacity type.')
return node_selector_args, return_code

if capacity_type_str == CapacityType.UNKNOWN.name:
# Use default settings from cluster creation.
metadata_configmap_name = f'{args.cluster}-{CLUSTER_METADATA_CONFIGMAP}'
cluster_config_map = get_cluster_configmap(args, metadata_configmap_name)

# Error out if the metadata config map doesn't exist, and is attempting to use
# autoprovisioning.
if cluster_config_map is None:
xpk_print(
'Unable to find config map. Please specify a capacity type'
' --on-demand, --spot, --reservation=$RESERVATION_ID) to continue'
' to use autoprovisioning (--enable-autoprovisioning).'
)
return node_selector_args, 1

return_code, capacity_type_str = get_value_from_map(
CAPACITY_TYPE_CONFIG_KEY, cluster_config_map
cluster_config_map = get_cluster_configmap(
args, f'{args.cluster}-{CLUSTER_METADATA_CONFIGMAP}'
)
if cluster_config_map is None:
xpk_print(
'Unable to find config map. Please specify a capacity type'
' --on-demand, --spot, --reservation=$RESERVATION_ID) to continue'
' to use autoprovisioning (--enable-autoprovisioning).'
)
return node_selector_args, 1

if capacity_type_str == CapacityType.RESERVATION.name:
return_code, args.reservation = get_value_from_map(
RESERVATION_CONFIG_KEY, cluster_config_map
)
if return_code != 0:
return node_selector_args, return_code

if capacity_type_str == CapacityType.RESERVATION.name:
return_code, args.reservation = get_value_from_map(
RESERVATION_CONFIG_KEY, cluster_config_map
)
if return_code != 0:
return node_selector_args, return_code
return_code = verify_reservation_exists(args)
if return_code > 0:
xpk_print('Unable to verify reservation name saved in config map.')
return node_selector_args, return_code
return_code = verify_reservation_exists(args)
if return_code > 0:
xpk_print('Unable to verify reservation name saved in config map.')
return node_selector_args, return_code

# Check if reservation id is valid. Shared function with cluster creation.
node_selector_args, return_code = (
Expand All @@ -345,3 +377,36 @@ def get_autoprovisioning_node_selector_args(args) -> tuple[str, int]:
return node_selector_args, return_code

return node_selector_args, return_code


def get_autoprovisioning_tolerations(args) -> tuple[str, int]:
"""Determine the pod tolerations when autoprovisioning is enabled.

Args:
args: user provided arguments for running the command.

Returns:
Tuple with string of autoprovisioning tolerations and
int of 0 if successful and 1 otherwise.
"""
capacity_type_str, return_code = (
get_capacity_type_str_from_args_or_cluster_default(args)
)
if return_code != 0:
return '', return_code

if capacity_type_str == CapacityType.SPOT.name:
# https://cloud.google.com/kubernetes-engine/docs/concepts/node-auto-provisioning#support_for_spot_vms
#
# > Creating node pools based on Spot VMs is only considered if
# > unschedulable pods with a toleration for the
# > cloud.google.com/gke-spot="true":NoSchedule taint exist
return (
'''- key: "cloud.google.com/gke-spot"
operator: "Equal"
value: "true"
effect: "NoSchedule"''',
0,
)

return '', 0
Loading