|
56 | 56 | import yaml |
57 | 57 | import os |
58 | 58 | import requests |
| 59 | +import base64 |
| 60 | +import ray |
59 | 61 |
|
60 | 62 | from kubernetes import config |
61 | 63 | from kubernetes.dynamic import DynamicClient |
@@ -114,21 +116,82 @@ def _client_headers(self): |
114 | 116 | def _client_verify_tls(self): |
115 | 117 | return _is_openshift_cluster and self.config.verify_tls |
116 | 118 |
|
| 119 | + def get_ray_auth_token(self) -> Optional[str]: |
| 120 | + """ |
| 121 | + Retrieve the Ray auth token from the cluster's Secret. |
| 122 | +
|
| 123 | + When Ray token authentication is enabled (authOptions.mode: "token"), |
| 124 | + KubeRay creates a Secret with the same name as the cluster containing |
| 125 | + a randomly generated token. This method retrieves that token for use |
| 126 | + with JobSubmissionClient and ray.init(). |
| 127 | +
|
| 128 | + Returns: |
| 129 | + The Ray auth token string, or None if the Secret doesn't exist |
| 130 | + (cluster not ready or auth disabled). |
| 131 | +
|
| 132 | + Note: |
| 133 | + Requires read access to Secrets in the cluster's namespace. |
| 134 | + """ |
| 135 | + if not self.config.enable_ray_token_auth: |
| 136 | + return None |
| 137 | + |
| 138 | + secret_name = self.config.name # KubeRay uses cluster name as Secret name |
| 139 | + namespace = self.config.namespace |
| 140 | + |
| 141 | + try: |
| 142 | + config_check() |
| 143 | + core_v1 = client.CoreV1Api(get_api_client()) |
| 144 | + secret = core_v1.read_namespaced_secret(secret_name, namespace) |
| 145 | + # KubeRay stores the token under the "auth_token" key |
| 146 | + token_bytes = secret.data.get("auth_token") |
| 147 | + if token_bytes: |
| 148 | + return base64.b64decode(token_bytes).decode("utf-8") |
| 149 | + return None |
| 150 | + except ApiException as e: |
| 151 | + if e.status == 404: |
| 152 | + # Secret not created yet (cluster not ready) or auth disabled |
| 153 | + return None |
| 154 | + raise |
| 155 | + |
| 156 | + def _setup_ray_auth_env(self): |
| 157 | + """ |
| 158 | + Automatically set up Ray token authentication environment variables. |
| 159 | +
|
| 160 | + This method retrieves the Ray auth token from the KubeRay-created Secret |
| 161 | + and sets RAY_AUTH_TOKEN and RAY_AUTH_MODE environment variables. This |
| 162 | + allows users to use standard ray.init() without manual token setup. |
| 163 | +
|
| 164 | + Called automatically when users access cluster_uri() or job_client. |
| 165 | + """ |
| 166 | + if self.config.enable_ray_token_auth: |
| 167 | + ray_token = self.get_ray_auth_token() |
| 168 | + if ray_token: |
| 169 | + os.environ["RAY_AUTH_TOKEN"] = ray_token |
| 170 | + os.environ["RAY_AUTH_MODE"] = "token" |
| 171 | + |
117 | 172 | @property |
118 | 173 | def job_client(self): |
119 | | - k8client = get_api_client() |
120 | 174 | if self._job_submission_client: |
121 | 175 | return self._job_submission_client |
| 176 | + |
| 177 | + # Automatically set up Ray auth environment variables |
| 178 | + self._setup_ray_auth_env() |
| 179 | + |
| 180 | + # Start with K8s auth headers (for OAuth proxy on OpenShift) |
| 181 | + headers = {} |
122 | 182 | if _is_openshift_cluster(): |
123 | | - self._job_submission_client = JobSubmissionClient( |
124 | | - self.cluster_dashboard_uri(), |
125 | | - headers=self._client_headers, |
126 | | - verify=self._client_verify_tls, |
127 | | - ) |
128 | | - else: |
129 | | - self._job_submission_client = JobSubmissionClient( |
130 | | - self.cluster_dashboard_uri() |
131 | | - ) |
| 183 | + headers = self._client_headers.copy() |
| 184 | + |
| 185 | + # Add Ray token auth if available (defense-in-depth) |
| 186 | + ray_token = self.get_ray_auth_token() |
| 187 | + if ray_token: |
| 188 | + headers["Authorization"] = f"Bearer {ray_token}" |
| 189 | + |
| 190 | + self._job_submission_client = JobSubmissionClient( |
| 191 | + self.cluster_dashboard_uri(), |
| 192 | + headers=headers if headers else None, |
| 193 | + verify=self._client_verify_tls if _is_openshift_cluster() else True, |
| 194 | + ) |
132 | 195 | return self._job_submission_client |
133 | 196 |
|
134 | 197 | def create_resource(self): |
@@ -498,7 +561,13 @@ def details(self, print_to_console: bool = True) -> RayCluster: |
498 | 561 | def cluster_uri(self) -> str: |
499 | 562 | """ |
500 | 563 | Returns a string containing the cluster's URI. |
| 564 | +
|
| 565 | + Automatically sets up Ray token authentication environment variables |
| 566 | + (RAY_AUTH_TOKEN and RAY_AUTH_MODE) if token auth is enabled, allowing |
| 567 | + users to use standard ray.init() without manual setup. |
501 | 568 | """ |
| 569 | + # Automatically set up Ray auth environment variables |
| 570 | + self._setup_ray_auth_env() |
502 | 571 | return f"ray://{self.config.name}-head-svc.{self.config.namespace}.svc:10001" |
503 | 572 |
|
504 | 573 | def cluster_dashboard_uri(self) -> str: |
|
0 commit comments