Skip to content

Commit 465da20

Browse files
Refactor creation logic of ingress/routes into RayCluster Controller (#493)
* WIP - Creation/Deletion of ingress and routes in RayCluster Controller * Always run RayCluster Controller * Add support file for RC Controller * Disable OAuth for e2e tests * Add OAuth config to reconciler struct
1 parent 2b0453e commit 465da20

File tree

4 files changed

+258
-30
lines changed

4 files changed

+258
-30
lines changed

.github/workflows/e2e_tests.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ jobs:
6060
run: |
6161
echo Deploying CodeFlare operator
6262
IMG="${REGISTRY_ADDRESS}"/codeflare-operator
63+
sed -i 's/RayDashboardOAuthEnabled: pointer.Bool(true)/RayDashboardOAuthEnabled: pointer.Bool(false)/' main.go
6364
make image-push -e IMG="${IMG}"
6465
make deploy -e IMG="${IMG}" -e ENV="e2e"
6566
kubectl wait --timeout=120s --for=condition=Available=true deployment -n openshift-operators codeflare-operator-manager

main.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,8 @@ func main() {
188188
}
189189

190190
v, err := HasAPIResourceForGVK(kubeClient.DiscoveryClient, rayv1.GroupVersion.WithKind("RayCluster"))
191-
if v && *cfg.KubeRay.RayDashboardOAuthEnabled {
192-
rayClusterController := controllers.RayClusterReconciler{Client: mgr.GetClient(), Scheme: mgr.GetScheme()}
191+
if v {
192+
rayClusterController := controllers.RayClusterReconciler{Client: mgr.GetClient(), Scheme: mgr.GetScheme(), Config: cfg}
193193
exitOnError(rayClusterController.SetupWithManager(mgr), "Error setting up RayCluster controller")
194194
} else if err != nil {
195195
exitOnError(err, "Could not determine if RayCluster CR present on cluster.")

pkg/controllers/raycluster_controller.go

+74-28
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ import (
4141
routev1 "github.com/openshift/api/route/v1"
4242
routeapply "github.com/openshift/client-go/route/applyconfigurations/route/v1"
4343
routev1client "github.com/openshift/client-go/route/clientset/versioned/typed/route/v1"
44+
45+
"github.com/project-codeflare/codeflare-operator/pkg/config"
4446
)
4547

4648
// RayClusterReconciler reconciles a RayCluster object
@@ -50,15 +52,17 @@ type RayClusterReconciler struct {
5052
routeClient *routev1client.RouteV1Client
5153
Scheme *runtime.Scheme
5254
CookieSalt string
55+
Config *config.CodeFlareOperatorConfiguration
5356
}
5457

5558
const (
56-
requeueTime = 10
57-
controllerName = "codeflare-raycluster-controller"
58-
oAuthFinalizer = "ray.openshift.ai/oauth-finalizer"
59-
oAuthServicePort = 443
60-
oAuthServicePortName = "oauth-proxy"
61-
logRequeueing = "requeueing"
59+
requeueTime = 10
60+
controllerName = "codeflare-raycluster-controller"
61+
oAuthFinalizer = "ray.openshift.ai/oauth-finalizer"
62+
oAuthServicePort = 443
63+
oAuthServicePortName = "oauth-proxy"
64+
ingressServicePortName = "dashboard"
65+
logRequeueing = "requeueing"
6266
)
6367

6468
var (
@@ -97,6 +101,10 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
97101
return ctrl.Result{}, client.IgnoreNotFound(err)
98102
}
99103

104+
isLocalInteractive := annotationBoolVal(ctx, &cluster, "sdk.codeflare.dev/local_interactive", false)
105+
ingressDomain := "" // FIX - CFO will retrieve it.
106+
isOpenShift, ingressHost := getClusterType(ctx, r.kubeClient, &cluster, ingressDomain)
107+
100108
if cluster.ObjectMeta.DeletionTimestamp.IsZero() {
101109
if !controllerutil.ContainsFinalizer(&cluster, oAuthFinalizer) {
102110
logger.Info("Add a finalizer", "finalizer", oAuthFinalizer)
@@ -130,29 +138,63 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
130138
return ctrl.Result{}, nil
131139
}
132140

133-
_, err := r.routeClient.Routes(cluster.Namespace).Apply(ctx, desiredClusterRoute(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
134-
if err != nil {
135-
logger.Error(err, "Failed to update OAuth Route")
136-
}
141+
if cluster.Status.State != "suspended" && r.isRayDashboardOAuthEnabled() && isOpenShift {
142+
logger.Info("Creating OAuth Objects")
143+
_, err := r.routeClient.Routes(cluster.Namespace).Apply(ctx, desiredClusterRoute(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
144+
if err != nil {
145+
logger.Error(err, "Failed to update OAuth Route")
146+
return ctrl.Result{RequeueAfter: requeueTime}, err
147+
}
137148

138-
_, err = r.kubeClient.CoreV1().Secrets(cluster.Namespace).Apply(ctx, desiredOAuthSecret(&cluster, r), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
139-
if err != nil {
140-
logger.Error(err, "Failed to create OAuth Secret")
141-
}
149+
_, err = r.kubeClient.CoreV1().Secrets(cluster.Namespace).Apply(ctx, desiredOAuthSecret(&cluster, r), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
150+
if err != nil {
151+
logger.Error(err, "Failed to create OAuth Secret")
152+
return ctrl.Result{RequeueAfter: requeueTime}, err
153+
}
142154

143-
_, err = r.kubeClient.CoreV1().Services(cluster.Namespace).Apply(ctx, desiredOAuthService(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
144-
if err != nil {
145-
logger.Error(err, "Failed to update OAuth Service")
146-
}
155+
_, err = r.kubeClient.CoreV1().Services(cluster.Namespace).Apply(ctx, desiredOAuthService(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
156+
if err != nil {
157+
logger.Error(err, "Failed to update OAuth Service")
158+
return ctrl.Result{RequeueAfter: requeueTime}, err
159+
}
147160

148-
_, err = r.kubeClient.CoreV1().ServiceAccounts(cluster.Namespace).Apply(ctx, desiredServiceAccount(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
149-
if err != nil {
150-
logger.Error(err, "Failed to update OAuth ServiceAccount")
151-
}
161+
_, err = r.kubeClient.CoreV1().ServiceAccounts(cluster.Namespace).Apply(ctx, desiredServiceAccount(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
162+
if err != nil {
163+
logger.Error(err, "Failed to update OAuth ServiceAccount")
164+
return ctrl.Result{RequeueAfter: requeueTime}, err
165+
}
152166

153-
_, err = r.kubeClient.RbacV1().ClusterRoleBindings().Apply(ctx, desiredOAuthClusterRoleBinding(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
154-
if err != nil {
155-
logger.Error(err, "Failed to update OAuth ClusterRoleBinding")
167+
_, err = r.kubeClient.RbacV1().ClusterRoleBindings().Apply(ctx, desiredOAuthClusterRoleBinding(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
168+
if err != nil {
169+
logger.Error(err, "Failed to update OAuth ClusterRoleBinding")
170+
return ctrl.Result{RequeueAfter: requeueTime}, err
171+
}
172+
173+
if isLocalInteractive {
174+
logger.Info("Creating RayClient Route")
175+
_, err := r.routeClient.Routes(cluster.Namespace).Apply(ctx, desiredRayClientRoute(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
176+
if err != nil {
177+
logger.Error(err, "Failed to update RayClient Route")
178+
return ctrl.Result{RequeueAfter: requeueTime}, err
179+
}
180+
}
181+
182+
} else if cluster.Status.State != "suspended" && !r.isRayDashboardOAuthEnabled() && !isOpenShift {
183+
logger.Info("Creating Dashboard Ingress")
184+
_, err := r.kubeClient.NetworkingV1().Ingresses(cluster.Namespace).Apply(ctx, desiredClusterIngress(&cluster, ingressHost), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
185+
if err != nil {
186+
// This log is info level since errors are not fatal and are expected
187+
logger.Info("WARN: Failed to update Dashboard Ingress", "error", err.Error(), logRequeueing, true)
188+
return ctrl.Result{RequeueAfter: requeueTime}, err
189+
}
190+
if isLocalInteractive && ingressDomain != "" {
191+
logger.Info("Creating RayClient Ingress")
192+
_, err := r.kubeClient.NetworkingV1().Ingresses(cluster.Namespace).Apply(ctx, desiredRayClientIngress(&cluster, ingressDomain), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
193+
if err != nil {
194+
logger.Error(err, "Failed to update RayClient Ingress")
195+
return ctrl.Result{RequeueAfter: requeueTime}, err
196+
}
197+
}
156198
}
157199

158200
return ctrl.Result{}, nil
@@ -193,19 +235,23 @@ func desiredServiceAccount(cluster *rayv1.RayCluster) *coreapply.ServiceAccountA
193235
WithAnnotations(map[string]string{
194236
"serviceaccounts.openshift.io/oauth-redirectreference.first": "" +
195237
`{"kind":"OAuthRedirectReference","apiVersion":"v1",` +
196-
`"reference":{"kind":"Route","name":"` + routeNameFromCluster(cluster) + `"}}`,
238+
`"reference":{"kind":"Route","name":"` + dashboardNameFromCluster(cluster) + `"}}`,
197239
}).
198240
WithOwnerReferences(
199241
v1.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion),
200242
)
201243
}
202244

203-
func routeNameFromCluster(cluster *rayv1.RayCluster) string {
245+
func dashboardNameFromCluster(cluster *rayv1.RayCluster) string {
204246
return "ray-dashboard-" + cluster.Name
205247
}
206248

249+
func rayClientNameFromCluster(cluster *rayv1.RayCluster) string {
250+
return "rayclient-" + cluster.Name
251+
}
252+
207253
func desiredClusterRoute(cluster *rayv1.RayCluster) *routeapply.RouteApplyConfiguration {
208-
return routeapply.Route(routeNameFromCluster(cluster), cluster.Namespace).
254+
return routeapply.Route(dashboardNameFromCluster(cluster), cluster.Namespace).
209255
WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}).
210256
WithSpec(routeapply.RouteSpec().
211257
WithTo(routeapply.RouteTargetReference().WithKind("Service").WithName(oauthServiceNameFromCluster(cluster))).

pkg/controllers/support.go

+181
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
package controllers
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strconv"
7+
"strings"
8+
9+
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
10+
11+
networkingv1 "k8s.io/api/networking/v1"
12+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13+
"k8s.io/apimachinery/pkg/types"
14+
"k8s.io/apimachinery/pkg/util/intstr"
15+
v1 "k8s.io/client-go/applyconfigurations/meta/v1"
16+
networkingv1ac "k8s.io/client-go/applyconfigurations/networking/v1"
17+
"k8s.io/client-go/discovery"
18+
"k8s.io/client-go/kubernetes"
19+
"k8s.io/client-go/rest"
20+
ctrl "sigs.k8s.io/controller-runtime"
21+
22+
routeapply "github.com/openshift/client-go/route/applyconfigurations/route/v1"
23+
)
24+
25+
func serviceNameFromCluster(cluster *rayv1.RayCluster) string {
26+
return cluster.Name + "-head-svc"
27+
}
28+
29+
func desiredRayClientRoute(cluster *rayv1.RayCluster) *routeapply.RouteApplyConfiguration {
30+
return routeapply.Route(rayClientNameFromCluster(cluster), cluster.Namespace).
31+
WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}).
32+
WithSpec(routeapply.RouteSpec().
33+
WithHost(rayClientNameFromCluster(cluster) + "-" + cluster.Namespace).
34+
WithTo(routeapply.RouteTargetReference().WithKind("Service").WithName(serviceNameFromCluster(cluster)).WithWeight(100)).
35+
WithPort(routeapply.RoutePort().WithTargetPort(intstr.FromString("client"))).
36+
WithTLS(routeapply.TLSConfig().WithTermination("passthrough")),
37+
).
38+
WithOwnerReferences(
39+
v1.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion),
40+
)
41+
}
42+
43+
// Create an Ingress object for the RayCluster
44+
func desiredRayClientIngress(cluster *rayv1.RayCluster, ingressDomain string) *networkingv1ac.IngressApplyConfiguration {
45+
return networkingv1ac.Ingress(rayClientNameFromCluster(cluster), cluster.Namespace).
46+
WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}).
47+
WithAnnotations(map[string]string{
48+
"nginx.ingress.kubernetes.io/rewrite-target": "/",
49+
"nginx.ingress.kubernetes.io/ssl-redirect": "true",
50+
"nginx.ingress.kubernetes.io/ssl-passthrough": "true",
51+
}).
52+
WithOwnerReferences(v1.OwnerReference().
53+
WithAPIVersion(cluster.APIVersion).
54+
WithKind(cluster.Kind).
55+
WithName(cluster.Name).
56+
WithUID(types.UID(cluster.UID))).
57+
WithSpec(networkingv1ac.IngressSpec().
58+
WithIngressClassName("nginx").
59+
WithRules(networkingv1ac.IngressRule().
60+
WithHost(rayClientNameFromCluster(cluster) + "-" + cluster.Namespace + "." + ingressDomain).
61+
WithHTTP(networkingv1ac.HTTPIngressRuleValue().
62+
WithPaths(networkingv1ac.HTTPIngressPath().
63+
WithPath("/").
64+
WithPathType(networkingv1.PathTypeImplementationSpecific).
65+
WithBackend(networkingv1ac.IngressBackend().
66+
WithService(networkingv1ac.IngressServiceBackend().
67+
WithName(serviceNameFromCluster(cluster)).
68+
WithPort(networkingv1ac.ServiceBackendPort().
69+
WithNumber(10001),
70+
),
71+
),
72+
),
73+
),
74+
),
75+
),
76+
)
77+
}
78+
79+
// Create an Ingress object for the RayCluster
80+
func desiredClusterIngress(cluster *rayv1.RayCluster, ingressHost string) *networkingv1ac.IngressApplyConfiguration {
81+
return networkingv1ac.Ingress(dashboardNameFromCluster(cluster), cluster.Namespace).
82+
WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}).
83+
WithOwnerReferences(v1.OwnerReference().
84+
WithAPIVersion(cluster.APIVersion).
85+
WithKind(cluster.Kind).
86+
WithName(cluster.Name).
87+
WithUID(types.UID(cluster.UID))).
88+
WithSpec(networkingv1ac.IngressSpec().
89+
WithRules(networkingv1ac.IngressRule().
90+
WithHost(ingressHost). // KinD hostname or ingressDomain
91+
WithHTTP(networkingv1ac.HTTPIngressRuleValue().
92+
WithPaths(networkingv1ac.HTTPIngressPath().
93+
WithPath("/").
94+
WithPathType(networkingv1.PathTypePrefix).
95+
WithBackend(networkingv1ac.IngressBackend().
96+
WithService(networkingv1ac.IngressServiceBackend().
97+
WithName(serviceNameFromCluster(cluster)).
98+
WithPort(networkingv1ac.ServiceBackendPort().
99+
WithName(ingressServicePortName),
100+
),
101+
),
102+
),
103+
),
104+
),
105+
),
106+
)
107+
}
108+
109+
// isOnKindCluster checks if the current cluster is a KinD cluster.
110+
// It searches for a node with a label commonly used by KinD clusters.
111+
func isOnKindCluster(clientset *kubernetes.Clientset) (bool, error) {
112+
nodes, err := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{
113+
LabelSelector: "kubernetes.io/hostname=kind-control-plane",
114+
})
115+
if err != nil {
116+
return false, err
117+
}
118+
// If we find one or more nodes with the label, assume it's a KinD cluster.
119+
return len(nodes.Items) > 0, nil
120+
}
121+
122+
// getDiscoveryClient returns a discovery client for the current reconciler
123+
func getDiscoveryClient(config *rest.Config) (*discovery.DiscoveryClient, error) {
124+
return discovery.NewDiscoveryClientForConfig(config)
125+
}
126+
127+
// Check where we are running. We are trying to distinguish here whether
128+
// this is vanilla kubernetes cluster or Openshift
129+
func getClusterType(ctx context.Context, clientset *kubernetes.Clientset, cluster *rayv1.RayCluster, ingressDomain string) (bool, string) {
130+
// The discovery package is used to discover APIs supported by a Kubernetes API server.
131+
logger := ctrl.LoggerFrom(ctx)
132+
config, err := ctrl.GetConfig()
133+
if err != nil && config == nil {
134+
logger.Info("Cannot retrieve config, assuming we're on Vanilla Kubernetes")
135+
return false, fmt.Sprintf("ray-dashboard-%s-%s.%s", cluster.Name, cluster.Namespace, ingressDomain)
136+
}
137+
dclient, err := getDiscoveryClient(config)
138+
if err != nil && dclient == nil {
139+
logger.Info("Cannot retrieve a DiscoveryClient, assuming we're on Vanilla Kubernetes")
140+
return false, fmt.Sprintf("ray-dashboard-%s-%s.%s", cluster.Name, cluster.Namespace, ingressDomain)
141+
}
142+
apiGroupList, err := dclient.ServerGroups()
143+
if err != nil {
144+
logger.Info("Error while querying ServerGroups, assuming we're on Vanilla Kubernetes")
145+
return false, ""
146+
}
147+
for i := 0; i < len(apiGroupList.Groups); i++ {
148+
if strings.HasSuffix(apiGroupList.Groups[i].Name, ".openshift.io") {
149+
logger.Info("We detected being on OpenShift!")
150+
return true, ""
151+
}
152+
}
153+
onKind, _ := isOnKindCluster(clientset)
154+
if onKind && ingressDomain == "" {
155+
logger.Info("We detected being on a KinD cluster!")
156+
return false, "kind"
157+
}
158+
logger.Info("We detected being on Vanilla Kubernetes!")
159+
return false, fmt.Sprintf("ray-dashboard-%s-%s.%s", cluster.Name, cluster.Namespace, ingressDomain)
160+
}
161+
162+
func (r *RayClusterReconciler) isRayDashboardOAuthEnabled() bool {
163+
if r.Config != nil && r.Config.KubeRay != nil && r.Config.KubeRay.RayDashboardOAuthEnabled != nil {
164+
return *r.Config.KubeRay.RayDashboardOAuthEnabled
165+
}
166+
return true
167+
}
168+
169+
func annotationBoolVal(ctx context.Context, cluster *rayv1.RayCluster, annotation string, defaultValue bool) bool {
170+
logger := ctrl.LoggerFrom(ctx)
171+
val, exists := cluster.ObjectMeta.Annotations[annotation]
172+
if !exists || val == "" {
173+
return defaultValue
174+
}
175+
boolVal, err := strconv.ParseBool(val)
176+
if err != nil {
177+
logger.Error(err, "Could not convert annotation value to bool", "annotation", annotation, "value", val)
178+
return defaultValue
179+
}
180+
return boolVal
181+
}

0 commit comments

Comments
 (0)