From 08fb2cf21e90ba341aced6b00e42ee9ef94f3d5b Mon Sep 17 00:00:00 2001 From: kryanbeane Date: Mon, 1 Dec 2025 15:27:03 +0000 Subject: [PATCH] add ray auth token support --- .../ray/cluster/build_ray_cluster.py | 6 ++ src/codeflare_sdk/ray/cluster/cluster.py | 89 ++++++++++++++++--- src/codeflare_sdk/ray/cluster/config.py | 7 ++ 3 files changed, 92 insertions(+), 10 deletions(-) diff --git a/src/codeflare_sdk/ray/cluster/build_ray_cluster.py b/src/codeflare_sdk/ray/cluster/build_ray_cluster.py index 6a3984b1..ec731509 100644 --- a/src/codeflare_sdk/ray/cluster/build_ray_cluster.py +++ b/src/codeflare_sdk/ray/cluster/build_ray_cluster.py @@ -174,6 +174,12 @@ def build_ray_cluster(cluster: "codeflare_sdk.ray.cluster.Cluster"): }, } + # Enable Ray token authentication by default for defense-in-depth security. + # KubeRay v1.5.1+ creates a Secret with a randomly generated token and injects + # RAY_AUTH_TOKEN and RAY_AUTH_MODE environment variables into all Ray containers. + if cluster.config.enable_ray_token_auth: + resource["spec"]["authOptions"] = {"mode": "token"} + if cluster.config.enable_gcs_ft: if not cluster.config.redis_address: raise ValueError( diff --git a/src/codeflare_sdk/ray/cluster/cluster.py b/src/codeflare_sdk/ray/cluster/cluster.py index 7167ea1a..ab33c1fb 100644 --- a/src/codeflare_sdk/ray/cluster/cluster.py +++ b/src/codeflare_sdk/ray/cluster/cluster.py @@ -56,6 +56,8 @@ import yaml import os import requests +import base64 +import ray from kubernetes import config from kubernetes.dynamic import DynamicClient @@ -114,21 +116,82 @@ def _client_headers(self): def _client_verify_tls(self): return _is_openshift_cluster and self.config.verify_tls + def get_ray_auth_token(self) -> Optional[str]: + """ + Retrieve the Ray auth token from the cluster's Secret. + + When Ray token authentication is enabled (authOptions.mode: "token"), + KubeRay creates a Secret with the same name as the cluster containing + a randomly generated token. This method retrieves that token for use + with JobSubmissionClient and ray.init(). + + Returns: + The Ray auth token string, or None if the Secret doesn't exist + (cluster not ready or auth disabled). + + Note: + Requires read access to Secrets in the cluster's namespace. + """ + if not self.config.enable_ray_token_auth: + return None + + secret_name = self.config.name # KubeRay uses cluster name as Secret name + namespace = self.config.namespace + + try: + config_check() + core_v1 = client.CoreV1Api(get_api_client()) + secret = core_v1.read_namespaced_secret(secret_name, namespace) + # KubeRay stores the token under the "auth_token" key + token_bytes = secret.data.get("auth_token") + if token_bytes: + return base64.b64decode(token_bytes).decode("utf-8") + return None + except ApiException as e: + if e.status == 404: + # Secret not created yet (cluster not ready) or auth disabled + return None + raise + + def _setup_ray_auth_env(self): + """ + Automatically set up Ray token authentication environment variables. + + This method retrieves the Ray auth token from the KubeRay-created Secret + and sets RAY_AUTH_TOKEN and RAY_AUTH_MODE environment variables. This + allows users to use standard ray.init() without manual token setup. + + Called automatically when users access cluster_uri() or job_client. + """ + if self.config.enable_ray_token_auth: + ray_token = self.get_ray_auth_token() + if ray_token: + os.environ["RAY_AUTH_TOKEN"] = ray_token + os.environ["RAY_AUTH_MODE"] = "token" + @property def job_client(self): - k8client = get_api_client() if self._job_submission_client: return self._job_submission_client + + # Automatically set up Ray auth environment variables + self._setup_ray_auth_env() + + # Start with K8s auth headers (for OAuth proxy on OpenShift) + headers = {} if _is_openshift_cluster(): - self._job_submission_client = JobSubmissionClient( - self.cluster_dashboard_uri(), - headers=self._client_headers, - verify=self._client_verify_tls, - ) - else: - self._job_submission_client = JobSubmissionClient( - self.cluster_dashboard_uri() - ) + headers = self._client_headers.copy() + + # Add Ray token auth if available (defense-in-depth) + ray_token = self.get_ray_auth_token() + if ray_token: + headers["Authorization"] = f"Bearer {ray_token}" + + self._job_submission_client = JobSubmissionClient( + self.cluster_dashboard_uri(), + headers=headers if headers else None, + verify=self._client_verify_tls if _is_openshift_cluster() else True, + ) return self._job_submission_client def create_resource(self): @@ -498,7 +561,13 @@ def details(self, print_to_console: bool = True) -> RayCluster: def cluster_uri(self) -> str: """ Returns a string containing the cluster's URI. + + Automatically sets up Ray token authentication environment variables + (RAY_AUTH_TOKEN and RAY_AUTH_MODE) if token auth is enabled, allowing + users to use standard ray.init() without manual setup. """ + # Automatically set up Ray auth environment variables + self._setup_ray_auth_env() return f"ray://{self.config.name}-head-svc.{self.config.namespace}.svc:10001" def cluster_dashboard_uri(self) -> str: diff --git a/src/codeflare_sdk/ray/cluster/config.py b/src/codeflare_sdk/ray/cluster/config.py index 7759202b..e8a2b235 100644 --- a/src/codeflare_sdk/ray/cluster/config.py +++ b/src/codeflare_sdk/ray/cluster/config.py @@ -94,6 +94,12 @@ class ClusterConfiguration: Kubernetes secret reference containing Redis password. ex: {"name": "secret-name", "key": "password-key"} external_storage_namespace: The storage namespace to use for GCS fault tolerance. By default, KubeRay sets it to the UID of RayCluster. + enable_ray_token_auth: + A boolean indicating whether to enable Ray token authentication. + When enabled (default), KubeRay creates a Secret with a randomly generated token + and sets RAY_AUTH_TOKEN and RAY_AUTH_MODE environment variables on all Ray containers. + Requires Ray 2.52.0+ and KubeRay v1.5.1+. Provides defense-in-depth security + alongside OpenShift OAuth/OIDC authentication. """ name: str @@ -133,6 +139,7 @@ class ClusterConfiguration: redis_address: Optional[str] = None redis_password_secret: Optional[Dict[str, str]] = None external_storage_namespace: Optional[str] = None + enable_ray_token_auth: bool = True # Enabled by default for defense-in-depth def __post_init__(self): if not self.verify_tls: