Skip to content

Enable persistent Ray cluster state via external Redis GCS fault tolerance #821

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions docs/sphinx/user-docs/cluster-configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,48 @@ Custom Volumes/Volume Mounts
| For more information on creating Volumes and Volume Mounts with Python check out the Python Kubernetes docs (`Volumes <https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1Volume.md>`__, `Volume Mounts <https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1VolumeMount.md>`__).
| You can also find further information on Volumes and Volume Mounts by visiting the Kubernetes `documentation <https://kubernetes.io/docs/concepts/storage/volumes/>`__.

GCS Fault Tolerance
------------------
By default, the state of the Ray cluster is transient to the head Pod. Whatever triggers a restart of the head Pod results in losing that state, including Ray Cluster history. To make Ray cluster state persistent you can enable Global Control Service (GCS) fault tolerance with an external Redis storage.

To configure GCS fault tolerance you need to set the following parameters:

.. list-table::
:header-rows: 1
:widths: auto

* - Parameter
- Description
* - ``enable_gcs_ft``
- Boolean to enable GCS fault tolerance
* - ``redis_address``
- Address of the external Redis service, ex: "redis:6379"
* - ``redis_password_secret``
- Dictionary with 'name' and 'key' fields specifying the Kubernetes secret for Redis password
* - ``external_storage_namespace``
- Custom storage namespace for GCS fault tolerance (by default, KubeRay sets it to the RayCluster's UID)

Example configuration:

.. code:: python

from codeflare_sdk import Cluster, ClusterConfiguration

cluster = Cluster(ClusterConfiguration(
name='ray-cluster-with-persistence',
num_workers=2,
enable_gcs_ft=True,
redis_address="redis:6379",
redis_password_secret={
"name": "redis-password-secret",
"key": "password"
},
# external_storage_namespace="my-custom-namespace" # Optional: Custom namespace for GCS data in Redis
))

.. note::
You need to have a Redis instance deployed in your Kubernetes cluster before using this feature.

Deprecating Parameters
----------------------

Expand Down
25 changes: 25 additions & 0 deletions src/codeflare_sdk/ray/cluster/build_ray_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,31 @@ def build_ray_cluster(cluster: "codeflare_sdk.ray.cluster.Cluster"):
},
}

if cluster.config.enable_gcs_ft:
if not cluster.config.redis_address:
raise ValueError(
"redis_address must be provided when enable_gcs_ft is True"
)

gcs_ft_options = {"redisAddress": cluster.config.redis_address}

if cluster.config.external_storage_namespace:
gcs_ft_options[
"externalStorageNamespace"
] = cluster.config.external_storage_namespace

if cluster.config.redis_password_secret:
gcs_ft_options["redisPassword"] = {
"valueFrom": {
"secretKeyRef": {
"name": cluster.config.redis_password_secret["name"],
"key": cluster.config.redis_password_secret["key"],
}
}
}

resource["spec"]["gcsFaultToleranceOptions"] = gcs_ft_options

config_check()
k8s_client = get_api_client() or client.ApiClient()

Expand Down
44 changes: 40 additions & 4 deletions src/codeflare_sdk/ray/cluster/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ class ClusterConfiguration:
A list of V1Volume objects to add to the Cluster
volume_mounts:
A list of V1VolumeMount objects to add to the Cluster
enable_gcs_ft:
A boolean indicating whether to enable GCS fault tolerance.
redis_address:
The address of the Redis server to use for GCS fault tolerance, required when enable_gcs_ft is True.
redis_password_secret:
Kubernetes secret reference containing Redis password. ex: {"name": "secret-name", "key": "password-key"}
external_storage_namespace:
The storage namespace to use for GCS fault tolerance. By default, KubeRay sets it to the UID of RayCluster.
"""

name: str
Expand Down Expand Up @@ -142,13 +150,38 @@ class ClusterConfiguration:
annotations: Dict[str, str] = field(default_factory=dict)
volumes: list[V1Volume] = field(default_factory=list)
volume_mounts: list[V1VolumeMount] = field(default_factory=list)
enable_gcs_ft: bool = False
redis_address: Optional[str] = None
redis_password_secret: Optional[Dict[str, str]] = None
external_storage_namespace: Optional[str] = None

def __post_init__(self):
if not self.verify_tls:
print(
"Warning: TLS verification has been disabled - Endpoint checks will be bypassed"
)

if self.enable_gcs_ft:
if not self.redis_address:
raise ValueError(
"redis_address must be provided when enable_gcs_ft is True"
)

if self.redis_password_secret and not isinstance(
self.redis_password_secret, dict
):
raise ValueError(
"redis_password_secret must be a dictionary with 'name' and 'key' fields"
)

if self.redis_password_secret and (
"name" not in self.redis_password_secret
or "key" not in self.redis_password_secret
):
raise ValueError(
"redis_password_secret must contain both 'name' and 'key' fields"
)

self._validate_types()
self._memory_to_resource()
self._memory_to_string()
Expand Down Expand Up @@ -283,10 +316,13 @@ def check_type(value, expected_type):
else:
return True
if origin_type is dict:
return all(
check_type(k, args[0]) and check_type(v, args[1])
for k, v in value.items()
)
if value is not None:
return all(
check_type(k, args[0]) and check_type(v, args[1])
for k, v in value.items()
)
else:
return True
if origin_type is tuple:
return all(check_type(elem, etype) for elem, etype in zip(value, args))
if expected_type is int:
Expand Down