diff --git a/src/codeflare_sdk/ray/cluster/config.py b/src/codeflare_sdk/ray/cluster/config.py index 0715070f..7759202b 100644 --- a/src/codeflare_sdk/ray/cluster/config.py +++ b/src/codeflare_sdk/ray/cluster/config.py @@ -98,9 +98,9 @@ class ClusterConfiguration: name: str namespace: Optional[str] = None - head_cpu_requests: Union[int, str] = 2 + head_cpu_requests: Union[int, str] = 1 head_cpu_limits: Union[int, str] = 2 - head_memory_requests: Union[int, str] = 8 + head_memory_requests: Union[int, str] = 5 head_memory_limits: Union[int, str] = 8 head_extended_resource_requests: Dict[str, Union[str, int]] = field( default_factory=dict @@ -109,8 +109,8 @@ class ClusterConfiguration: worker_cpu_requests: Union[int, str] = 1 worker_cpu_limits: Union[int, str] = 1 num_workers: int = 1 - worker_memory_requests: Union[int, str] = 2 - worker_memory_limits: Union[int, str] = 2 + worker_memory_requests: Union[int, str] = 3 + worker_memory_limits: Union[int, str] = 6 worker_tolerations: Optional[List[V1Toleration]] = None appwrapper: bool = False envs: Dict[str, str] = field(default_factory=dict) diff --git a/src/codeflare_sdk/ray/cluster/test_pretty_print.py b/src/codeflare_sdk/ray/cluster/test_pretty_print.py index f36e290c..d0e10585 100644 --- a/src/codeflare_sdk/ray/cluster/test_pretty_print.py +++ b/src/codeflare_sdk/ray/cluster/test_pretty_print.py @@ -85,15 +85,15 @@ def test_ray_details(mocker, capsys): name="raytest1", status=RayClusterStatus.READY, num_workers=1, - worker_mem_requests="2G", - worker_mem_limits="2G", + worker_mem_requests="3G", + worker_mem_limits="6G", worker_cpu_requests=1, worker_cpu_limits=1, namespace="ns", dashboard="fake-uri", - head_cpu_requests=2, + head_cpu_requests=1, head_cpu_limits=2, - head_mem_requests=8, + head_mem_requests=5, head_mem_limits=8, ) mocker.patch( @@ -150,7 +150,7 @@ def test_ray_details(mocker, capsys): " │ ╭── Workers ──╮ ╭───────── Worker specs(each) ─────────╮ │ \n" " │ │ # Workers │ │ Memory CPU GPU │ │ \n" " │ │ │ │ │ │ \n" - " │ │ 1 │ │ 2G~2G 1~1 0 │ │ \n" + " │ │ 1 │ │ 3G~6G 1~1 0 │ │ \n" " │ │ │ │ │ │ \n" " │ ╰─────────────╯ ╰──────────────────────────────────────╯ │ \n" " ╰───────────────────────────────────────────────────────────────╯ \n" @@ -168,7 +168,7 @@ def test_ray_details(mocker, capsys): " │ ╭── Workers ──╮ ╭───────── Worker specs(each) ─────────╮ │ \n" " │ │ # Workers │ │ Memory CPU GPU │ │ \n" " │ │ │ │ │ │ \n" - " │ │ 1 │ │ 2G~2G 1~1 0 │ │ \n" + " │ │ 1 │ │ 3G~6G 1~1 0 │ │ \n" " │ │ │ │ │ │ \n" " │ ╰─────────────╯ ╰──────────────────────────────────────╯ │ \n" " ╰───────────────────────────────────────────────────────────────╯ \n" @@ -184,7 +184,7 @@ def test_ray_details(mocker, capsys): "│ ╭── Workers ──╮ ╭───────── Worker specs(each) ─────────╮ │\n" "│ │ # Workers │ │ Memory CPU GPU │ │\n" "│ │ │ │ │ │\n" - "│ │ 1 │ │ 2G~2G 1~1 0 │ │\n" + "│ │ 1 │ │ 3G~6G 1~1 0 │ │\n" "│ │ │ │ │ │\n" "│ ╰─────────────╯ ╰──────────────────────────────────────╯ │\n" "╰───────────────────────────────────────────────────────────────╯\n" diff --git a/src/codeflare_sdk/ray/rayjobs/rayjob.py b/src/codeflare_sdk/ray/rayjobs/rayjob.py index 8c4325d5..e19abee2 100644 --- a/src/codeflare_sdk/ray/rayjobs/rayjob.py +++ b/src/codeflare_sdk/ray/rayjobs/rayjob.py @@ -478,6 +478,9 @@ def _validate_priority_class(self): Raises ValueError if the priority class is definitively known not to exist. If we cannot verify (e.g., permission denied), logs a warning and allows submission. """ + if self._cluster_config is None: + return + if self.priority_class: logger.debug(f"Validating priority class '{self.priority_class}'...") exists = priority_class_exists(self.priority_class) diff --git a/tests/e2e_v2/__init__.py b/tests/e2e_v2/__init__.py new file mode 100644 index 00000000..cf57e1bd --- /dev/null +++ b/tests/e2e_v2/__init__.py @@ -0,0 +1,2 @@ +# E2E Test Suite v2 +# Restructured pytest-based E2E tests for CodeFlare SDK diff --git a/tests/e2e_v2/cluster_management/__init__.py b/tests/e2e_v2/cluster_management/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/cluster_management/configuration/__init__.py b/tests/e2e_v2/cluster_management/configuration/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/cluster_management/configuration/test_advanced.py b/tests/e2e_v2/cluster_management/configuration/test_advanced.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/cluster_management/configuration/test_heterogeneous.py b/tests/e2e_v2/cluster_management/configuration/test_heterogeneous.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/cluster_management/configuration/test_images.py b/tests/e2e_v2/cluster_management/configuration/test_images.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/cluster_management/configuration/test_resources.py b/tests/e2e_v2/cluster_management/configuration/test_resources.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/cluster_management/configuration/test_volumes.py b/tests/e2e_v2/cluster_management/configuration/test_volumes.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/cluster_management/creation/__init__.py b/tests/e2e_v2/cluster_management/creation/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/cluster_management/creation/test_cluster_creation.py b/tests/e2e_v2/cluster_management/creation/test_cluster_creation.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/cluster_management/creation/test_cluster_kueue.py b/tests/e2e_v2/cluster_management/creation/test_cluster_kueue.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/cluster_management/interactive/__init__.py b/tests/e2e_v2/cluster_management/interactive/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/cluster_management/interactive/test_in_cluster.py b/tests/e2e_v2/cluster_management/interactive/test_in_cluster.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/cluster_management/interactive/test_remote.py b/tests/e2e_v2/cluster_management/interactive/test_remote.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/conftest.py b/tests/e2e_v2/conftest.py new file mode 100644 index 00000000..d6eab7e1 --- /dev/null +++ b/tests/e2e_v2/conftest.py @@ -0,0 +1,408 @@ +""" +Pytest configuration and fixtures for E2E v2 tests. + +This module provides comprehensive fixtures for: +- Platform detection (OpenShift vs Kind) +- GPU availability detection +- Namespace and Kueue resource setup +- Authentication and credentials +- Ray image selection +- Resource configurations +""" + +import os +import pytest +from kubernetes import client, config + +from tests.e2e_v2.utils.helpers import random_suffix as _random_suffix + + +# ============================================================================= +# Platform Detection Fixtures +# ============================================================================= + + +@pytest.fixture(scope="session") +def k8s_client(): + """ + Initialize and return the Kubernetes API client. + Loads kubeconfig from the environment. + """ + config.load_kube_config() + return client.CoreV1Api() + + +@pytest.fixture(scope="session") +def custom_api(k8s_client): + """Return the CustomObjectsApi for CRD operations.""" + return client.CustomObjectsApi(k8s_client.api_client) + + +@pytest.fixture(scope="session") +def is_openshift_platform(): + """ + Detect if running on OpenShift by checking for OpenShift-specific API resources. + + Returns: + bool: True if running on OpenShift, False otherwise. + """ + try: + api = client.ApiClient() + discovery = client.ApisApi(api) + groups = discovery.get_api_versions().groups + for group in groups: + if group.name == "image.openshift.io": + return True + return False + except Exception: + return False + + +@pytest.fixture(scope="session") +def is_gpu_available(k8s_client): + """ + Detect if NVIDIA GPUs are available in the cluster. + + Returns: + bool: True if GPUs are available, False otherwise. + """ + try: + nodes = k8s_client.list_node() + for node in nodes.items: + allocatable = node.status.allocatable or {} + if "nvidia.com/gpu" in allocatable: + gpu_count = allocatable.get("nvidia.com/gpu", "0") + if int(gpu_count) > 0: + return True + return False + except Exception: + return False + + +# ============================================================================= +# Credential Fixtures +# ============================================================================= + + +@pytest.fixture(scope="session") +def test_user_credentials(): + """ + Get TEST_USER credentials for most tests (ldap-admin1). + These are injected via environment variables. + + Returns: + dict: Dictionary with 'username' and 'password' keys. + """ + return { + "username": os.environ.get("TEST_USER_USERNAME", ""), + "password": os.environ.get("TEST_USER_PASSWORD", ""), + } + + +@pytest.fixture(scope="session") +def admin_user_credentials(): + """ + Get OCP_ADMIN_USER credentials for admin operations. + These are injected via environment variables. + + Returns: + dict: Dictionary with 'username' and 'password' keys. + """ + return { + "username": os.environ.get("OCP_ADMIN_USER_USERNAME", ""), + "password": os.environ.get("OCP_ADMIN_USER_PASSWORD", ""), + } + + +@pytest.fixture(scope="function") +def auth_token(is_openshift_platform): + """ + Get OAuth token for OpenShift authentication. + Only applicable on OpenShift clusters. + + Returns: + str or None: The auth token if on OpenShift, None otherwise. + """ + if not is_openshift_platform: + return None + + import subprocess + + try: + result = subprocess.run( + ["oc", "whoami", "--show-token=true"], + capture_output=True, + text=True, + check=True, + ) + return result.stdout.strip() + except subprocess.CalledProcessError: + return None + + +# ============================================================================= +# Ray Image Fixtures +# ============================================================================= + + +@pytest.fixture(scope="session") +def ray_image(is_openshift_platform): + """ + Get appropriate Ray image based on platform. + + - OpenShift: Uses the CUDA runtime image (quay.io/modh/ray:...) + - Kind/K8s: Uses the standard Ray image (rayproject/ray:VERSION) + + Can be overridden via RAY_IMAGE environment variable. + + Returns: + str: The Ray image to use. + """ + from codeflare_sdk.common.utils import constants + from codeflare_sdk.common.utils.utils import get_ray_image_for_python_version + + # Allow explicit override + if "RAY_IMAGE" in os.environ: + return os.environ["RAY_IMAGE"] + + if is_openshift_platform: + return get_ray_image_for_python_version() + else: + return f"rayproject/ray:{constants.RAY_VERSION}" + + +# ============================================================================= +# Resource Configuration Fixtures +# ============================================================================= + + +@pytest.fixture(scope="session") +def platform_resources(is_openshift_platform): + """ + Get appropriate resource configurations based on platform. + OpenShift with MODH images requires more memory than Kind with standard Ray images. + + Returns: + dict: Resource configurations with head and worker CPU/memory settings. + """ + if is_openshift_platform: + return { + "head_cpu_requests": "1", + "head_cpu_limits": "1.5", + "head_memory_requests": 7, + "head_memory_limits": 8, + "worker_cpu_requests": "1", + "worker_cpu_limits": "1", + "worker_memory_requests": 5, + "worker_memory_limits": 6, + } + else: + return { + "head_cpu_requests": "1", + "head_cpu_limits": "1.5", + "head_memory_requests": 7, + "head_memory_limits": 8, + "worker_cpu_requests": "1", + "worker_cpu_limits": "1", + "worker_memory_requests": 2, + "worker_memory_limits": 3, + } + + +# ============================================================================= +# Namespace Fixtures +# ============================================================================= + + +@pytest.fixture(scope="function") +def test_namespace(k8s_client): + """ + Create a unique test namespace and clean it up after the test. + + Yields: + str: The namespace name. + """ + namespace_name = f"test-ns-{_random_suffix()}" + + namespace_body = client.V1Namespace( + metadata=client.V1ObjectMeta(name=namespace_name) + ) + k8s_client.create_namespace(namespace_body) + + yield namespace_name + + # Cleanup + try: + k8s_client.delete_namespace(namespace_name) + except Exception: + pass + + +@pytest.fixture(scope="function") +def test_namespace_with_kueue(k8s_client, custom_api, test_namespace): + """ + Create a test namespace with Kueue resources (ResourceFlavor, ClusterQueue, LocalQueue). + + Yields: + dict: Dictionary with namespace, resource_flavors, cluster_queues, local_queues. + """ + from tests.e2e_v2.utils.kueue import ( + create_resource_flavor, + create_cluster_queue, + create_local_queue, + delete_kueue_resources, + ) + + resource_flavor_name = f"test-flavor-{_random_suffix()}" + cluster_queue_name = f"test-cq-{_random_suffix()}" + local_queue_name = f"test-lq-{_random_suffix()}" + + # Create Kueue resources + create_resource_flavor(custom_api, resource_flavor_name) + create_cluster_queue(custom_api, cluster_queue_name, resource_flavor_name) + create_local_queue(custom_api, test_namespace, cluster_queue_name, local_queue_name) + + result = { + "namespace": test_namespace, + "resource_flavors": [resource_flavor_name], + "cluster_queues": [cluster_queue_name], + "local_queues": [local_queue_name], + } + + yield result + + # Cleanup Kueue resources + delete_kueue_resources( + custom_api, + cluster_queues=[cluster_queue_name], + resource_flavors=[resource_flavor_name], + ) + + +# ============================================================================= +# Environment Variables Fixture +# ============================================================================= + + +@pytest.fixture(scope="function") +def setup_env_variables(): + """ + Get environment variables for test setup (PIP_INDEX_URL, AWS credentials, etc.). + + Returns: + dict: Environment variables for test setup. + """ + env_vars = {} + + # PIP configuration + if os.environ.get("PIP_INDEX_URL"): + env_vars["PIP_INDEX_URL"] = os.environ.get("PIP_INDEX_URL") + env_vars["PIP_TRUSTED_HOST"] = os.environ.get("PIP_TRUSTED_HOST", "") + else: + env_vars["PIP_INDEX_URL"] = "https://pypi.org/simple/" + env_vars["PIP_TRUSTED_HOST"] = "pypi.org" + + # AWS/S3 configuration + if os.environ.get("AWS_DEFAULT_ENDPOINT"): + env_vars["AWS_DEFAULT_ENDPOINT"] = os.environ.get("AWS_DEFAULT_ENDPOINT") + env_vars["AWS_ACCESS_KEY_ID"] = os.environ.get("AWS_ACCESS_KEY_ID", "") + env_vars["AWS_SECRET_ACCESS_KEY"] = os.environ.get("AWS_SECRET_ACCESS_KEY", "") + env_vars["AWS_STORAGE_BUCKET"] = os.environ.get("AWS_STORAGE_BUCKET", "") + env_vars["AWS_STORAGE_BUCKET_MNIST_DIR"] = os.environ.get( + "AWS_STORAGE_BUCKET_MNIST_DIR", "" + ) + + return env_vars + + +# ============================================================================= +# In-Cluster Test Execution Fixtures +# ============================================================================= + + +@pytest.fixture(scope="function") +def in_cluster_service_account(k8s_client, custom_api, test_namespace): + """ + Create a service account with RBAC permissions for in-cluster test execution. + + This fixture automatically sets up a service account with permissions to + create/manage RayJobs, RayClusters, and related resources. Useful for tests + that need to run code inside pods that interact with the Kubernetes API. + + Yields: + str: The service account name to use in pod creation. + + Example: + def test_something(in_cluster_service_account): + result = run_code_in_pod( + api_instance=k8s_client, + namespace=test_namespace, + code="...", + service_account=in_cluster_service_account, + ) + """ + from tests.e2e_v2.utils.in_cluster import ( + setup_in_cluster_test_environment, + cleanup_in_cluster_test_environment, + ) + + service_account_name = setup_in_cluster_test_environment( + api_instance=k8s_client, + custom_api=custom_api, + namespace=test_namespace, + name_prefix="test-pod", + ) + + yield service_account_name + + # Cleanup + cleanup_in_cluster_test_environment( + api_instance=k8s_client, + custom_api=custom_api, + namespace=test_namespace, + service_account_name=service_account_name, + ) + + +# ============================================================================= +# Test Control Fixtures +# ============================================================================= + + +@pytest.fixture +def require_gpu_flag(request, num_gpus): + """ + Skip GPU tests unless explicitly run with -m "gpu". + + If the current parameter requires GPUs (num_gpus > 0), + skip the test unless the user explicitly ran with -m "gpu". + This allows CPU tests to run by default. + """ + if num_gpus > 0: + m_flag = str(request.config.getoption("-m")) + if "gpu" not in m_flag: + pytest.skip( + "Skipping GPU config (default is CPU). Run with -m 'gpu' to enable." + ) + + +# ============================================================================= +# Skip Markers +# ============================================================================= + + +def pytest_configure(config): + """Register custom markers.""" + config.addinivalue_line("markers", "kind: mark test to run only on Kind clusters") + config.addinivalue_line( + "markers", "openshift: mark test to run only on OpenShift clusters" + ) + config.addinivalue_line("markers", "gpu: mark test to require GPU resources") + config.addinivalue_line( + "markers", "tier1: mark test as tier1 (standard test suite)" + ) + config.addinivalue_line( + "markers", "smoke: mark test as smoke test (quick validation)" + ) + config.addinivalue_line("markers", "pre_upgrade: mark test to run before upgrade") + config.addinivalue_line("markers", "post_upgrade: mark test to run after upgrade") diff --git a/tests/e2e_v2/job_submission/__init__.py b/tests/e2e_v2/job_submission/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/job_submission/rayjob_client/__init__.py b/tests/e2e_v2/job_submission/rayjob_client/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/job_submission/rayjob_client/test_in_cluster.py b/tests/e2e_v2/job_submission/rayjob_client/test_in_cluster.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/job_submission/rayjob_client/test_remote.py b/tests/e2e_v2/job_submission/rayjob_client/test_remote.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/job_submission/rayjob_cr/__init__.py b/tests/e2e_v2/job_submission/rayjob_cr/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/job_submission/rayjob_cr/test_existing_cluster.py b/tests/e2e_v2/job_submission/rayjob_cr/test_existing_cluster.py new file mode 100644 index 00000000..8580342c --- /dev/null +++ b/tests/e2e_v2/job_submission/rayjob_cr/test_existing_cluster.py @@ -0,0 +1,206 @@ +""" +Test RayJob CR submission to existing clusters. +""" + +import textwrap + +import pytest + +from codeflare_sdk import Cluster, ClusterConfiguration, RayJob +from codeflare_sdk.common.kubernetes_cluster.auth import TokenAuthentication + +from ...utils.support import ( + create_kueue_resources, + delete_kueue_resources, + initialize_kubernetes_client, + create_namespace, + delete_namespace, + get_ray_image, + run_oc_command, +) +from ...utils.helpers import wait_for_job_finished, get_job_status, assert_job_succeeded +from ...utils.in_cluster import run_code_in_pod + + +class TestRayJobCRExistingCluster: + CPU_CONFIG = 0 + GPU_CONFIG = pytest.param(1, marks=pytest.mark.gpu) + + def setup_method(self): + initialize_kubernetes_client(self) + + def teardown_method(self): + delete_kueue_resources(self) + delete_namespace(self) + + @pytest.mark.openshift + @pytest.mark.parametrize("num_gpus", [CPU_CONFIG, GPU_CONFIG]) + def test_openshift_remote_submission(self, num_gpus, require_gpu_flag): + auth = TokenAuthentication( + token=run_oc_command(["whoami", "--show-token=true"]), + server=run_oc_command(["whoami", "--show-server=true"]), + skip_tls=True, + ) + auth.login() + + self.run_remote_submission(num_gpus=num_gpus) + + @pytest.mark.kind + @pytest.mark.parametrize("num_gpus", [CPU_CONFIG, GPU_CONFIG]) + def test_kind_remote_submission(self, num_gpus, require_gpu_flag): + self.run_remote_submission(num_gpus=num_gpus) + + @pytest.mark.openshift + @pytest.mark.parametrize("num_gpus", [CPU_CONFIG, GPU_CONFIG]) + def test_openshift_in_cluster_submission(self, num_gpus, require_gpu_flag): + auth = TokenAuthentication( + token=run_oc_command(["whoami", "--show-token=true"]), + server=run_oc_command(["whoami", "--show-server=true"]), + skip_tls=True, + ) + auth.login() + + self.run_in_cluster_submission(num_gpus=num_gpus) + + @pytest.mark.kind + @pytest.mark.parametrize("num_gpus", [CPU_CONFIG, GPU_CONFIG]) + def test_kind_in_cluster_submission(self, num_gpus, require_gpu_flag): + self.run_in_cluster_submission(num_gpus=num_gpus) + + def run_remote_submission(self, num_gpus): + create_namespace(self) + create_kueue_resources(self) + + cluster_name = "existing-cluster" + + cluster = Cluster( + ClusterConfiguration( + name=cluster_name, + namespace=self.namespace, + num_workers=1, + head_extended_resource_requests={"nvidia.com/gpu": num_gpus}, + worker_extended_resource_requests={"nvidia.com/gpu": num_gpus}, + image=get_ray_image(), + local_queue=self.local_queues[0], + verify_tls=False, + ) + ) + + if num_gpus > 0: + entrypoint = "python tests/e2e_v2/utils/scripts/gpu_script.py" + else: + entrypoint = "python tests/e2e_v2/utils/scripts/cpu_script.py" + + jobs = [] + try: + cluster.apply() + cluster.wait_ready() + + for i in range(3): + rayjob = RayJob( + job_name=f"remote-job-{i}", + cluster_name=cluster_name, + namespace=self.namespace, + entrypoint=entrypoint, + ) + rayjob.submit() + jobs.append(rayjob) + + # Wait for all jobs to finish + for job in jobs: + assert wait_for_job_finished( + job_name=job.name, namespace=self.namespace, timeout=300 + ), f"RayJob '{job.name}' did not finish within timeout" + + # Verify all jobs succeeded + for job in jobs: + status = get_job_status(job.name, self.namespace) + assert_job_succeeded(status, job.name) + + finally: + for job in jobs: + try: + job.delete() + except Exception: + pass + cluster.down() + + def run_in_cluster_submission(self, num_gpus): + create_namespace(self) + create_kueue_resources(self) + + cluster_name = "rayjob-cr-in-cluster" + + cluster = Cluster( + ClusterConfiguration( + name=cluster_name, + namespace=self.namespace, + num_workers=1, + head_extended_resource_requests={"nvidia.com/gpu": num_gpus}, + worker_extended_resource_requests={"nvidia.com/gpu": num_gpus}, + image=get_ray_image(), + local_queue=self.local_queues[0], + verify_tls=False, + ) + ) + + try: + cluster.apply() + cluster.wait_ready() + + job_name_in_cluster = "in-cluster-rayjob-cr" + entrypoint_cmd = ( + "python -c \"import ray; ray.init(); print('IN_CLUSTER_CR_SUCCESS')\"" + ) + + in_cluster_code = textwrap.dedent( + f""" + from codeflare_sdk import RayJob + from time import sleep + + entrypoint = {entrypoint_cmd!r} + rayjob = RayJob( + job_name="{job_name_in_cluster}", + cluster_name="{cluster_name}", + namespace="{self.namespace}", + entrypoint=entrypoint, + ) + + rayjob.submit() + + timeout, elapsed = 120, 0 + while elapsed < timeout: + status, _ = rayjob.status(print_to_console=False) + if status.name in ["COMPLETE", "FAILED"]: + break + sleep(5) + elapsed += 5 + + final_status, _ = rayjob.status(print_to_console=False) + rayjob.delete() + + if final_status.name == "COMPLETE": + print("IN_CLUSTER_CR_SUBMISSION_PASSED") + """ + ) + + result = run_code_in_pod( + api_instance=self.api_instance, + namespace=self.namespace, + code=in_cluster_code, + image=get_ray_image(), + pip_packages=["codeflare-sdk"], + timeout=300, + auto_setup_rbac=True, + custom_api=self.custom_api, + ) + + assert ( + result.succeeded + ), f"In-cluster submission failed. Logs: {result.logs}" + assert ( + "IN_CLUSTER_CR_SUBMISSION_PASSED" in result.logs + ), f"In-cluster submission did not pass. Logs: {result.logs}" + + finally: + cluster.down() diff --git a/tests/e2e_v2/job_submission/rayjob_cr/test_lifecycled_cluster.py b/tests/e2e_v2/job_submission/rayjob_cr/test_lifecycled_cluster.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/kueue_integration/__init__.py b/tests/e2e_v2/kueue_integration/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/kueue_integration/test_admission.py b/tests/e2e_v2/kueue_integration/test_admission.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/kueue_integration/test_queueing.py b/tests/e2e_v2/kueue_integration/test_queueing.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/kueue_integration/test_resource_flavors.py b/tests/e2e_v2/kueue_integration/test_resource_flavors.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/security/__init__.py b/tests/e2e_v2/security/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/security/test_mtls.py b/tests/e2e_v2/security/test_mtls.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/security/test_network_policies.py b/tests/e2e_v2/security/test_network_policies.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/ui/README.md b/tests/e2e_v2/ui/README.md new file mode 100644 index 00000000..dd150a4d --- /dev/null +++ b/tests/e2e_v2/ui/README.md @@ -0,0 +1,225 @@ +# RHOAI Dashboard UI Tests + +This directory contains UI tests for the RHOAI Dashboard, specifically targeting the Distributed Workloads page to verify Ray cluster visibility and functionality. + +## Overview + +The UI tests use Selenium WebDriver with Chrome (headless) to automate browser interactions with the RHOAI Dashboard. They are designed to work in conjunction with the upgrade tests to verify that Ray clusters created before an upgrade remain visible and functional after the upgrade. + +## Test Structure + +``` +tests/ui/ +├── conftest.py # Pytest fixtures for Selenium setup +├── pages/ +│ └── distributed_workloads_page.py # Page Object Model for Distributed Workloads page +└── README.md # This file + +tests/upgrade/ +├── 01_raycluster_sdk_upgrade_test.py # Pre/post upgrade backend tests (runs first) +├── 02_dashboard_ui_upgrade_test.py # Pre/post upgrade UI tests (runs second) +└── conftest.py # Imports UI fixtures for upgrade tests +``` + +**Note**: Test files are prefixed with numbers (`01_`, `02_`) to ensure proper execution order: +1. First, the Ray cluster is created (`01_raycluster_sdk_upgrade_test.py`) +2. Then, the UI tests verify the cluster appears in the dashboard (`02_dashboard_ui_upgrade_test.py`) + +## Prerequisites + +### Python Dependencies + +The UI tests require the following dependencies (already added to `pyproject.toml`): + +- `selenium >= 4.27.1` - Browser automation framework +- `webdriver-manager >= 4.0.2` - Automatic ChromeDriver management + +Install dependencies: +```bash +poetry install --with test +``` + +### System Requirements + +- **Chrome or Chromium browser** (required for headless execution) + - The Docker image includes Google Chrome Stable + - If running locally, ensure Chrome is installed + - UI tests will be skipped if Chrome is not available +- OpenShift CLI (`oc`) installed and configured +- Access to RHOAI Dashboard + +### Environment Variables + +The tests require the following environment variables: + +- `TEST_USER_USERNAME` - Username for RHOAI Dashboard login +- `TEST_USER_PASSWORD` - Password for RHOAI Dashboard login +- `ODH_DASHBOARD_URL` (optional) - Dashboard URL (auto-detected via `oc get consolelink rhodslink` if not set) +- `OPENSHIFT_IDP_NAME` (optional) - OpenShift identity provider name (e.g., "ldap", "htpasswd"). If not set, the fixture will try to auto-detect based on username pattern + +## Running the Tests + +### Run Pre-Upgrade UI Tests + +```bash +# Run all pre-upgrade tests including UI tests +poetry run pytest tests/upgrade/ -m pre_upgrade -v + +# Run only pre-upgrade UI tests +poetry run pytest tests/upgrade/ -m "pre_upgrade and ui" -v +``` + +### Run Post-Upgrade UI Tests + +```bash +# Run all post-upgrade tests including UI tests +poetry run pytest tests/upgrade/ -m post_upgrade -v + +# Run only post-upgrade UI tests +poetry run pytest tests/upgrade/ -m "post_upgrade and ui" -v +``` + +### Run All Upgrade Tests (Pre and Post) + +```bash +poetry run pytest tests/upgrade/ -m "pre_upgrade or post_upgrade" -v +``` + +### Skip UI Tests + +If you want to run upgrade tests but skip UI tests (e.g., if browser is not available): + +```bash +poetry run pytest tests/upgrade/ -m "pre_upgrade and not ui" -v +``` + +## Test Flow + +### Pre-Upgrade (`TestDistributedWorkloadsUIPreUpgrade`) + +1. Login to RHOAI Dashboard +2. Navigate to Distributed Workloads page +3. Select the test namespace (`test-ns-rayupgrade`) +4. Verify cluster is in "Running" state +5. Check Project Metrics tab shows resource metrics +6. Check Workload Status tab shows cluster with Running status + +### Post-Upgrade (`TestDistributedWorkloadsUIPostUpgrade`) + +1. Login to RHOAI Dashboard +2. Navigate to Distributed Workloads page +3. Select the test namespace (`test-ns-rayupgrade`) +4. Verify cluster is still in "Running" state after upgrade +5. Check Project Metrics tab still shows resource metrics +6. Check Workload Status tab still shows cluster with Running status + +## Page Object Model + +The tests use the Page Object Model (POM) design pattern to separate test logic from page interactions. The `DistributedWorkloadsPage` class encapsulates all interactions with the Distributed Workloads page. + +### Key Methods + +- `navigate()` - Navigate to the Distributed Workloads page +- `select_project(project_name)` - Select a project from the dropdown +- `verify_cluster_running()` - Check if any cluster shows "Running" status +- `click_project_metrics_tab()` - Switch to Project Metrics tab +- `verify_metrics_visible()` - Verify resource metrics are displayed +- `click_workload_status_tab()` - Switch to Workload Status tab +- `verify_cluster_in_workload_list(cluster_name)` - Verify cluster appears in list with Running status + +## Debugging + +### Enable Screenshots on Failure + +Screenshots are automatically saved to `/tmp/login_failure.png` if login fails. To capture screenshots on other failures, you can add: + +```python +try: + driver.save_screenshot("/tmp/test_failure.png") +except: + pass +``` + +### Run in Non-Headless Mode + +To see the browser during test execution (useful for debugging), modify `tests/ui/conftest.py`: + +```python +# Comment out this line: +# chrome_options.add_argument("--headless") +``` + +### Verbose Logging + +All page interactions print status messages. Run tests with `-s` flag to see them: + +```bash +poetry run pytest tests/upgrade/dashboard_ui_upgrade_test.py -m pre_upgrade -v -s +``` + +## Troubleshooting + +### ChromeDriver Issues + +If you encounter ChromeDriver compatibility issues: + +```bash +# Clear webdriver-manager cache +rm -rf ~/.wdm/ + +# Or manually specify ChromeDriver version +# Edit conftest.py and modify: +service = Service(ChromeDriverManager(version="specific_version").install()) +``` + +### Login Issues + +- Verify `TEST_USER_USERNAME` and `TEST_USER_PASSWORD` are set +- Check that the user has access to the RHOAI Dashboard +- Ensure the cluster's OAuth is properly configured + +**Identity Provider (IDP) Selection**: +- The fixture automatically tries to select the correct IDP based on your username + - Usernames containing "ldap" → selects LDAP IDP + - Usernames containing "htpasswd" → selects htpasswd IDP +- If auto-detection fails, set `OPENSHIFT_IDP_NAME` environment variable: + ```bash + export OPENSHIFT_IDP_NAME="ldap" # or "htpasswd", "kube:admin", etc. + ``` +- Check screenshot at `/tmp/login_failure.png` to see available IDPs + +### Dashboard URL Not Found + +If `oc get consolelink rhodslink` fails: + +```bash +# Manually check available consolelinks +oc get consolelink + +# Set URL manually +export ODH_DASHBOARD_URL="https://your-dashboard-url" +``` + +### Timeout Issues + +If elements are not found within the default 30-second timeout, you can adjust it: + +```python +dw_page = DistributedWorkloadsPage(driver, timeout=60) +``` + +## Integration with CI/CD + +The UI tests are designed to run in the same container as the other tests. The `run-tests.sh` script automatically: + +1. Retrieves the Dashboard URL via `oc get consolelink rhodslink` +2. Uses the same `TEST_USER_USERNAME` and `TEST_USER_PASSWORD` credentials +3. Runs UI tests alongside other upgrade tests when appropriate markers are specified + +## Future Enhancements + +- Add video recording of test execution +- Implement retry logic for flaky UI interactions +- Add cross-browser testing (Firefox, Edge) +- Expand coverage to other RHOAI Dashboard pages +- Add performance metrics collection diff --git a/tests/e2e_v2/ui/__init__.py b/tests/e2e_v2/ui/__init__.py new file mode 100644 index 00000000..f1db51e6 --- /dev/null +++ b/tests/e2e_v2/ui/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2024 IBM, Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/e2e_v2/ui/conftest.py b/tests/e2e_v2/ui/conftest.py new file mode 100644 index 00000000..176dd7d5 --- /dev/null +++ b/tests/e2e_v2/ui/conftest.py @@ -0,0 +1,369 @@ +# Copyright 2024 IBM, Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +import os +from selenium import webdriver +from selenium.webdriver.common.by import By +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.chrome.service import Service +from webdriver_manager.chrome import ChromeDriverManager + + +@pytest.fixture(scope="class") +def selenium_driver(request): + """Setup Selenium WebDriver for UI tests""" + chrome_options = Options() + chrome_options.add_argument("--headless") + chrome_options.add_argument("--no-sandbox") + chrome_options.add_argument("--disable-dev-shm-usage") + chrome_options.add_argument("--ignore-certificate-errors") + chrome_options.add_argument("--window-size=1920,1080") + chrome_options.add_argument("--disable-gpu") + + try: + # Use webdriver-manager to automatically manage chromedriver + service = Service(ChromeDriverManager().install()) + driver = webdriver.Chrome(service=service, options=chrome_options) + driver.implicitly_wait(10) + + # Make driver available to the test class + if request.cls is not None: + request.cls.driver = driver + + yield driver + + # Cleanup + driver.quit() + except Exception as e: + pytest.skip( + f"Chrome/ChromeDriver not available, skipping UI test: {e}\n" + "To run UI tests, ensure Chrome is installed in the container." + ) + + +@pytest.fixture(scope="class") +def dashboard_url(): + """Get RHOAI Dashboard URL from environment or oc command""" + # First check if URL is provided as environment variable + url = os.getenv("ODH_DASHBOARD_URL") + + if url: + print(f"Using Dashboard URL from environment: {url}") + return url + + # If not provided, try to get it from oc command + try: + import subprocess + + result = subprocess.run( + ["oc", "get", "consolelink", "rhodslink", "-o", "jsonpath='{.spec.href}'"], + capture_output=True, + text=True, + check=True, + ) + url = result.stdout.strip().strip("'") + print(f"Retrieved Dashboard URL from oc command: {url}") + return url + except subprocess.CalledProcessError as e: + print(f"Failed to get Dashboard URL from oc command: {e}") + raise RuntimeError( + "ODH_DASHBOARD_URL not set and failed to retrieve from oc command" + ) + except FileNotFoundError: + raise RuntimeError( + "oc command not found. Please ensure OpenShift CLI is installed or set ODH_DASHBOARD_URL environment variable" + ) + + +@pytest.fixture(scope="class") +def test_credentials(): + """Get test user credentials from environment""" + username = os.getenv("TEST_USER_USERNAME") + password = os.getenv("TEST_USER_PASSWORD") + + if not username or not password: + raise RuntimeError( + "TEST_USER_USERNAME and TEST_USER_PASSWORD must be set in environment" + ) + + return {"username": username, "password": password} + + +@pytest.fixture(scope="class") +def login_to_dashboard(selenium_driver, dashboard_url, test_credentials): + """Login to RHOAI Dashboard""" + driver = selenium_driver + wait = WebDriverWait(driver, 30) + + print(f"Navigating to dashboard at: {dashboard_url}") + driver.get(dashboard_url) + + # Give page time to load + import time + + time.sleep(3) + + try: + print(f"Current URL after navigation: {driver.current_url}") + print(f"Page title: {driver.title}") + + # First, check if we're already on the dashboard (no login required) + try: + # Try multiple possible dashboard indicators + dashboard_indicators = [ + (By.XPATH, "//h1[contains(text(), 'Applications')]"), + (By.XPATH, "//*[contains(text(), 'Data Science')]"), + (By.XPATH, "//a[contains(@href, 'distributed-workloads')]"), + (By.CSS_SELECTOR, "[data-id='distributed-workloads']"), + # Red Hat OpenShift AI specific indicators + (By.XPATH, "//title[contains(text(), 'Red Hat OpenShift AI')]"), + (By.XPATH, "//a[contains(@href, 'applications')]"), + (By.CSS_SELECTOR, "nav[aria-label='Global navigation']"), + (By.CSS_SELECTOR, "[class*='odh-dashboard']"), + (By.CSS_SELECTOR, "[class*='app-launcher']"), + ] + + for locator in dashboard_indicators: + try: + element = driver.find_element(*locator) + if element.is_displayed(): + print( + f"Already on dashboard, no login required (found: {locator})" + ) + return driver + except: + continue + except: + pass + + # Not on dashboard, try to login + print("Dashboard not found, attempting login...") + + # First, check if we need to select an identity provider (OpenShift OAuth page) + # This page typically shows buttons like "htpasswd", "ldap", etc. + try: + print("Checking for identity provider selection page...") + + # Try to find all available IDPs + idp_selectors = [ + (By.XPATH, "//a[contains(@href, 'oauth/authorize')]"), + (By.XPATH, "//div[@data-test-id='login']//a"), + (By.XPATH, "//div[contains(@class, 'login')]//a"), + ] + + all_idp_buttons = [] + for by, value in idp_selectors: + try: + elements = driver.find_elements(by, value) + for elem in elements: + elem_text = elem.text.lower() if elem.text else "" + elem_href = elem.get_attribute("href") or "" + # Check if it's an IDP link + if "authorize" in elem_href or any( + keyword in elem_text + for keyword in [ + "htpasswd", + "ldap", + "login", + "log in", + "sign in", + "kube", + ] + ): + all_idp_buttons.append((elem, elem.text, elem_href)) + print( + f"Found IDP option: text='{elem.text}', href='{elem_href[:100]}'" + ) + except Exception as e: + continue + + if all_idp_buttons: + # Try to intelligently select the right IDP based on username + username = test_credentials["username"].lower() + selected_idp = None + + # Strategy 1: Match username pattern to IDP name + if "ldap" in username: + # Look for ldap IDP + for elem, text, href in all_idp_buttons: + if "ldap" in text.lower() or "ldap" in href.lower(): + selected_idp = (elem, text) + print( + f"Selected LDAP IDP based on username pattern: {text}" + ) + break + elif "htpasswd" in username or "admin" in username: + # Look for htpasswd IDP + for elem, text, href in all_idp_buttons: + if "htpasswd" in text.lower() or "htpasswd" in href.lower(): + selected_idp = (elem, text) + print( + f"Selected htpasswd IDP based on username pattern: {text}" + ) + break + + # Strategy 2: If no match, use environment variable if set + if not selected_idp: + idp_name = os.getenv("OPENSHIFT_IDP_NAME", "").lower() + if idp_name: + for elem, text, href in all_idp_buttons: + if idp_name in text.lower() or idp_name in href.lower(): + selected_idp = (elem, text) + print(f"Selected IDP from environment variable: {text}") + break + + # Strategy 3: If still no match and only one IDP, use it + if not selected_idp and len(all_idp_buttons) == 1: + selected_idp = (all_idp_buttons[0][0], all_idp_buttons[0][1]) + print(f"Only one IDP available, using: {selected_idp[1]}") + + # Strategy 4: If multiple IDPs and no match, skip IDP selection + # (some clusters may not require IDP selection if there's a default) + if not selected_idp: + print( + f"Multiple IDPs found but couldn't determine which to use. Available: {[text for _, text, _ in all_idp_buttons]}" + ) + print("Skipping IDP selection, will try direct login form") + else: + print(f"Clicking identity provider button: {selected_idp[1]}") + selected_idp[0].click() + time.sleep(3) # Wait for redirect to login form + print(f"After IDP click - URL: {driver.current_url}") + except Exception as e: + print(f"No identity provider selection needed or failed to handle: {e}") + + # Handle OpenShift OAuth login flow + # Wait for username field (various possible IDs depending on OAuth provider) + username_field = None + possible_username_selectors = [ + (By.ID, "inputUsername"), + (By.ID, "username"), + (By.ID, "login"), + (By.NAME, "username"), + (By.NAME, "login"), + (By.CSS_SELECTOR, "input[type='text'][name='username']"), + (By.CSS_SELECTOR, "input[type='text'][name='login']"), + ] + + for by, value in possible_username_selectors: + try: + username_field = WebDriverWait(driver, 5).until( + EC.presence_of_element_located((by, value)) + ) + print(f"Found username field using: {by}={value}") + break + except: + continue + + if not username_field: + print("ERROR: Could not find username field") + print(f"Page source preview (first 500 chars):\n{driver.page_source[:500]}") + raise RuntimeError( + "Could not find username field. " + f"Current URL: {driver.current_url}, " + f"Page title: {driver.title}" + ) + + username_field.send_keys(test_credentials["username"]) + print(f"Entered username: {test_credentials['username']}") + + # Find password field + password_field = None + possible_password_selectors = [ + (By.ID, "inputPassword"), + (By.ID, "password"), + (By.NAME, "password"), + (By.CSS_SELECTOR, "input[type='password']"), + ] + + for by, value in possible_password_selectors: + try: + password_field = driver.find_element(by, value) + print(f"Found password field using: {by}={value}") + break + except: + continue + + if not password_field: + raise RuntimeError("Could not find password field") + + password_field.send_keys(test_credentials["password"]) + print("Entered password") + + # Click login button + login_button = driver.find_element(By.CSS_SELECTOR, "button[type='submit']") + login_button.click() + print("Clicked login button") + + # Wait for dashboard to load + # Try multiple possible indicators that we're on the dashboard + print("Waiting for dashboard to load...") + dashboard_loaded = False + + for i in range(6): # Try for up to 30 seconds (6 * 5 seconds) + time.sleep(5) + print(f"Attempt {i+1}/6 - Current URL: {driver.current_url}") + print(f"Attempt {i+1}/6 - Page title: {driver.title}") + + # Check if page title indicates we're on the dashboard + if "Red Hat OpenShift AI" in driver.title or "OpenShift" in driver.title: + print(f"Dashboard loaded successfully (title: {driver.title})") + dashboard_loaded = True + break + + # Try finding dashboard elements + for locator in dashboard_indicators: + try: + element = driver.find_element(*locator) + if element.is_displayed(): + print(f"Dashboard loaded successfully (found: {locator})") + dashboard_loaded = True + break + except: + continue + + if dashboard_loaded: + break + + if not dashboard_loaded: + raise RuntimeError( + f"Dashboard did not load after login. " + f"Final URL: {driver.current_url}, " + f"Page title: {driver.title}" + ) + + print("Successfully logged in to RHOAI Dashboard") + + except Exception as e: + print(f"Login failed: {e}") + # Take screenshot for debugging + try: + screenshot_path = "/tmp/login_failure.png" + driver.save_screenshot(screenshot_path) + print(f"Screenshot saved to: {screenshot_path}") + except: + pass + + # Print page source for debugging (first 1000 chars) + try: + print(f"\nPage source preview:\n{driver.page_source[:1000]}") + except: + pass + + raise + + return driver diff --git a/tests/e2e_v2/ui/pages/__init__.py b/tests/e2e_v2/ui/pages/__init__.py new file mode 100644 index 00000000..f1db51e6 --- /dev/null +++ b/tests/e2e_v2/ui/pages/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2024 IBM, Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/e2e_v2/ui/pages/distributed_workloads_page.py b/tests/e2e_v2/ui/pages/distributed_workloads_page.py new file mode 100644 index 00000000..473761b9 --- /dev/null +++ b/tests/e2e_v2/ui/pages/distributed_workloads_page.py @@ -0,0 +1,610 @@ +# Copyright 2024 IBM, Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from selenium.webdriver.common.by import By +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC + + +class DistributedWorkloadsPage: + """Page Object Model for RHOAI Workload Metrics page""" + + # Locators - multiple options for better compatibility + # New structure: "Workload metrics" is nested under "Observe & monitor" + OBSERVE_MONITOR_NAV_OPTIONS = [ + (By.XPATH, "//nav//button[contains(., 'Observe & monitor')]"), + (By.XPATH, "//nav//a[contains(., 'Observe & monitor')]"), + ( + By.XPATH, + "//button[contains(@class, 'pf-v5-c-nav__link') and contains(., 'Observe')]", + ), + ] + + WORKLOAD_METRICS_NAV_OPTIONS = [ + (By.XPATH, "//a[contains(., 'Workload metrics')]"), + (By.XPATH, "//nav//a[contains(@href, 'workload-metrics')]"), + (By.XPATH, "//nav//a[contains(@href, 'workloadmetrics')]"), + ] + + PAGE_TITLE_OPTIONS = [ + (By.XPATH, "//h1[contains(text(), 'Workload metrics')]"), + ] + # Project selector - multiple options for compatibility (PatternFly v6) + PROJECT_SELECTOR_OPTIONS = [ + (By.ID, "project-selector"), # Direct ID match + (By.CSS_SELECTOR, "[data-testid='project-selector-toggle']"), # Data testid + (By.XPATH, "//button[@id='project-selector']"), + (By.XPATH, "//button[@data-testid='project-selector-toggle']"), + ( + By.XPATH, + "//button[contains(@class, 'pf-v6-c-menu-toggle')]", + ), # PatternFly v6 + ( + By.XPATH, + "//button[contains(@class, 'pf-v5-c-menu-toggle')]", + ), # PatternFly v5 fallback + (By.XPATH, "//button[contains(@aria-label, 'project')]"), + (By.XPATH, "//button[contains(@aria-label, 'Project')]"), + ] + # Status locators - support multiple states (Admitted, Running, etc.) + STATUS_LABEL_OPTIONS = [ + # PatternFly v6 + (By.XPATH, "//span[contains(@class, 'pf-v6-c-label__text')]"), + # PatternFly v5 + (By.XPATH, "//span[contains(@class, 'pf-v5-c-label__text')]"), + # Generic + (By.XPATH, "//span[contains(@class, 'pf-c-label__text')]"), + ] + + # Workload metrics table + WORKLOAD_METRICS_TABLE = ( + By.CSS_SELECTOR, + "[data-testid='workload-resource-metrics-table']", + ) + + # Tab locators - support both PatternFly v5 and v6 + PROJECT_METRICS_TAB_OPTIONS = [ + ( + By.XPATH, + "//button[contains(@class, 'pf-v6-c-tabs__link') and .//span[text()='Project metrics']]", + ), + ( + By.XPATH, + "//button[contains(@class, 'pf-v5-c-tabs__link') and .//span[text()='Project metrics']]", + ), + (By.XPATH, "//button[@role='tab' and .//span[text()='Project metrics']]"), + ] + + WORKLOAD_STATUS_TAB_OPTIONS = [ + ( + By.XPATH, + "//button[contains(@class, 'pf-v6-c-tabs__link') and .//span[text()='Workload status']]", + ), + ( + By.XPATH, + "//button[contains(@class, 'pf-v5-c-tabs__link') and .//span[text()='Workload status']]", + ), + ( + By.XPATH, + "//button[@role='tab' and contains(.//span/text(), 'workload status')]", + ), + ] + + RESOURCE_METRICS_TITLE_OPTIONS = [ + (By.XPATH, "//*[contains(text(), 'Requested resources')]"), + (By.XPATH, "//h1[contains(text(), 'Project metrics')]"), + (By.XPATH, "//h2[contains(text(), 'Project metrics')]"), + (By.XPATH, "//*[contains(text(), 'Resource metrics')]"), + (By.CSS_SELECTOR, "[data-testid='dw-requested-resources']"), + (By.CSS_SELECTOR, "[data-testid='dw-workload-resource-metrics']"), + ] + + def __init__(self, driver, timeout=30): + self.driver = driver + self.wait = WebDriverWait(driver, timeout) + + def navigate(self): + """Navigate to Workload Metrics page (nested under Observe & monitor)""" + import time + + # Give React app time to fully render (increased from 3 to 10 seconds) + print("Waiting for dashboard React app to fully render...") + for i in range(10): + time.sleep(1) + # Check if body has content + try: + body = self.driver.find_element(By.TAG_NAME, "body") + if len(body.text) > 100: # Body has substantial content + print(f"Dashboard rendered after {i+1} seconds") + break + except: + pass + + time.sleep(2) # Extra wait for animations/transitions + + try: + # Step 1: Find and click "Observe & monitor" in the side nav + print("Searching for 'Observe & monitor' navigation item...") + observe_monitor_element = None + + for by, value in self.OBSERVE_MONITOR_NAV_OPTIONS: + try: + print(f"Trying locator: {by}={value}") + element = self.driver.find_element(by, value) + if element.is_displayed(): + observe_monitor_element = element + print(f"Found 'Observe & monitor' using: {by}={value}") + break + except Exception as e: + print(f"Locator {by}={value} not found: {str(e)[:100]}") + continue + + if observe_monitor_element: + # Check if it's expandable (has nested items) + print("Clicking 'Observe & monitor' to expand submenu...") + observe_monitor_element.click() + time.sleep(1) # Wait for submenu to expand + else: + print("Warning: Could not find 'Observe & monitor' navigation item") + + # Step 2: Find and click "Workload metrics" link + print("Searching for 'Workload metrics' navigation link...") + workload_metrics_link = None + + for by, value in self.WORKLOAD_METRICS_NAV_OPTIONS: + try: + print(f"Trying locator: {by}={value}") + element = self.driver.find_element(by, value) + if element.is_displayed(): + workload_metrics_link = element + print(f"Found 'Workload metrics' link using: {by}={value}") + break + except Exception as e: + print(f"Locator {by}={value} not found: {str(e)[:100]}") + continue + + if not workload_metrics_link: + print( + "\nCould not find navigation link, attempting direct URL navigation..." + ) + # Try direct navigation to workload metrics page + current_url = self.driver.current_url + base_url = current_url.rstrip("/") + + # Try different possible URL patterns + possible_urls = [ + f"{base_url}/workloadMetrics", + f"{base_url}/workload-metrics", + f"{base_url}/workloadmetrics", + f"{base_url}/workloads", + ] + + navigation_successful = False + for url in possible_urls: + try: + print(f"Trying direct navigation to: {url}") + self.driver.get(url) + time.sleep(3) + + # Check if we got to a valid page (not 404) + if ( + "404" not in self.driver.page_source + and "not found" not in self.driver.page_source.lower() + ): + print(f"Successfully navigated to: {url}") + navigation_successful = True + break + except Exception as e: + print(f"Direct navigation to {url} failed: {str(e)[:100]}") + continue + + if not navigation_successful: + # Take screenshot for debugging + try: + screenshot_path = "/tmp/workload_metrics_nav_failure.png" + self.driver.save_screenshot(screenshot_path) + print(f"Screenshot saved to: {screenshot_path}") + except: + pass + + # Print more page source for debugging + page_source = self.driver.page_source + print(f"\nPage source (chars 1000-3000):\n{page_source[1000:3000]}") + + # Try to find any navigation links + print("\nSearching for any navigation links...") + try: + nav_links = self.driver.find_elements(By.XPATH, "//nav//a") + print(f"Found {len(nav_links)} navigation links:") + for link in nav_links[:20]: # Print first 20 + try: + print( + f" - text: '{link.text}', href: '{link.get_attribute('href')}'" + ) + except: + pass + except Exception as e: + print(f"Could not enumerate nav links: {e}") + + raise RuntimeError( + f"Could not find or navigate to Workload Metrics page. " + f"Current URL: {self.driver.current_url}, " + f"Page title: {self.driver.title}" + ) + else: + # Click the link + print("Clicking 'Workload metrics' link...") + workload_metrics_link.click() + + # Wait for page to load + print("Waiting for Workload Metrics page to load...") + time.sleep(3) + + # Verify we're on the right page + print(f"Final URL: {self.driver.current_url}") + if "workload" in self.driver.current_url.lower(): + print( + "Successfully navigated to Workload Metrics page (URL indicates success)" + ) + else: + print( + f"Warning: URL might not be Workload Metrics page: {self.driver.current_url}" + ) + + except Exception as e: + print(f"Failed to navigate to Workload Metrics page: {e}") + # Take screenshot on any failure + try: + screenshot_path = "/tmp/workload_metrics_nav_failure.png" + self.driver.save_screenshot(screenshot_path) + print(f"Screenshot saved to: {screenshot_path}") + except: + pass + raise + + def select_project(self, project_name): + """Select a project by navigating directly to the project URL""" + import time + + try: + # Wait a bit for the page to fully load + time.sleep(2) + + # Check current URL to see if we're already on the right project + current_url = self.driver.current_url + print(f"Current URL: {current_url}") + + # Check if already on the correct project + if f"/{project_name}" in current_url or current_url.endswith(project_name): + print(f"Project '{project_name}' is already selected") + return + + # Use direct URL navigation + print(f"Selecting project '{project_name}' via URL navigation") + self._select_project_by_url(project_name) + + except Exception as e: + print(f"Failed to select project {project_name}: {e}") + # Take screenshot on any failure + try: + screenshot_path = "/tmp/select_project_failure.png" + self.driver.save_screenshot(screenshot_path) + print(f"Screenshot saved to: {screenshot_path}") + except: + pass + raise + + def _select_project_by_url(self, project_name): + """Select project by navigating to the URL with the project name""" + import time + import re + + current_url = self.driver.current_url + print(f"Attempting to select project via URL navigation") + print(f"Current URL: {current_url}") + + # URL pattern: .../observe-monitor/workload-metrics/workload-status/{project_name} + # Replace the last path segment with the project name + if "/workload-metrics/" in current_url: + # Find the last path segment and replace it + url_parts = current_url.rstrip("/").split("/") + + # If URL ends with a project name, replace it + if len(url_parts) >= 2: + # Replace last segment with project_name + url_parts[-1] = project_name + new_url = "/".join(url_parts) + else: + # Append project name to URL + new_url = f"{current_url.rstrip('/')}/{project_name}" + + print(f"Navigating to: {new_url}") + self.driver.get(new_url) + time.sleep(3) # Wait for page to load + + print(f"New URL after navigation: {self.driver.current_url}") + print(f"Successfully navigated to project: {project_name}") + else: + raise RuntimeError( + f"Cannot determine correct URL pattern to select project. " + f"Current URL: {current_url}" + ) + + def verify_cluster_running(self): + """Verify that a cluster is in Running or Admitted state""" + import time + + try: + # Wait a bit for the page to load + time.sleep(2) + + # Try to find status labels using multiple locators + print("Searching for cluster status...") + status_found = False + + for by, value in self.STATUS_LABEL_OPTIONS: + try: + print(f"Trying locator: {by}={value}") + status_elements = self.driver.find_elements(by, value) + + if status_elements: + print(f"Found {len(status_elements)} status label(s)") + for elem in status_elements: + if elem.is_displayed(): + status_text = elem.text + print(f"Status text: {status_text}") + # Accept both "Running" and "Admitted" as valid states + if status_text in ["Running", "Admitted"]: + print(f"✓ Cluster is in {status_text} state") + status_found = True + break + + if status_found: + break + + except Exception as e: + print(f"Locator {by}={value} error: {str(e)[:100]}") + continue + + if status_found: + return True + + # Fallback: Try to find the workload metrics table as indication of success + print("Status label not found, checking for workload metrics table...") + try: + table = self.driver.find_element(*self.WORKLOAD_METRICS_TABLE) + if table.is_displayed(): + print("✓ Workload metrics table is visible (cluster exists)") + return True + except: + pass + + print("✗ Could not verify cluster status") + return False + + except Exception as e: + print(f"Failed to verify cluster status: {e}") + import traceback + + traceback.print_exc() + return False + + def click_project_metrics_tab(self): + """Click on the Project Metrics tab""" + import time + + try: + # Try to find the tab using multiple locators + print("Searching for Project Metrics tab...") + tab = None + + for by, value in self.PROJECT_METRICS_TAB_OPTIONS: + try: + print(f"Trying locator: {by}={value}") + element = self.driver.find_element(by, value) + if element.is_displayed(): + tab = element + print(f"Found Project Metrics tab using: {by}={value}") + break + except Exception as e: + print(f"Locator {by}={value} not found: {str(e)[:100]}") + continue + + if not tab: + # Take screenshot for debugging + try: + screenshot_path = "/tmp/project_metrics_tab_not_found.png" + self.driver.save_screenshot(screenshot_path) + print(f"Screenshot saved to: {screenshot_path}") + except: + pass + raise RuntimeError("Could not find Project Metrics tab") + + tab.click() + time.sleep(2) # Wait for tab content to load + print("Successfully clicked Project Metrics tab") + except Exception as e: + print(f"Failed to click Project Metrics tab: {e}") + raise + + def verify_metrics_visible(self): + """Verify that resource metrics are visible""" + import time + + try: + # Wait a bit for content to load + time.sleep(2) + + # Try to find metrics using multiple locators + print("Searching for resource metrics indicators...") + + for by, value in self.RESOURCE_METRICS_TITLE_OPTIONS: + try: + print(f"Trying locator: {by}={value}") + element = self.driver.find_element(by, value) + if element.is_displayed(): + print(f"Found resource metrics using: {by}={value}") + return True + except Exception as e: + print(f"Locator {by}={value} not found: {str(e)[:100]}") + continue + + # If no specific metrics title found, check if the tab content area exists + try: + # Look for the project-metrics tab content section + tab_content = self.driver.find_element( + By.XPATH, + "//section[@id='project-metrics-tab-content' or contains(@aria-labelledby, 'project-metrics')]", + ) + if tab_content.is_displayed(): + print("Project metrics tab content is visible") + return True + except: + pass + + # Take screenshot for debugging + try: + screenshot_path = "/tmp/metrics_not_visible.png" + self.driver.save_screenshot(screenshot_path) + print(f"Screenshot saved to: {screenshot_path}") + except: + pass + + print("Resource metrics not visible") + return False + except Exception as e: + print(f"Failed to verify metrics visibility: {e}") + return False + + def click_workload_status_tab(self): + """Click on the Workload Status tab""" + import time + + try: + # Try to find the tab using multiple locators + print("Searching for Workload Status tab...") + tab = None + + for by, value in self.WORKLOAD_STATUS_TAB_OPTIONS: + try: + print(f"Trying locator: {by}={value}") + element = self.driver.find_element(by, value) + if element.is_displayed(): + tab = element + print(f"Found Workload Status tab using: {by}={value}") + break + except Exception as e: + print(f"Locator {by}={value} not found: {str(e)[:100]}") + continue + + if not tab: + # Take screenshot for debugging + try: + screenshot_path = "/tmp/workload_status_tab_not_found.png" + self.driver.save_screenshot(screenshot_path) + print(f"Screenshot saved to: {screenshot_path}") + except: + pass + raise RuntimeError("Could not find Workload Status tab") + + tab.click() + time.sleep(2) # Wait for tab content to load + print("Successfully clicked Workload Status tab") + except Exception as e: + print(f"Failed to click Workload Status tab: {e}") + raise + + def verify_cluster_in_workload_list(self, cluster_name): + """Verify that a cluster appears in the workload list with Running or Admitted status""" + import time + + try: + # Wait for table to load + time.sleep(2) + + # Look for the cluster name in the table + cluster_cell = self.wait.until( + EC.presence_of_element_located( + (By.XPATH, f"//td[contains(text(), '{cluster_name}')]") + ) + ) + is_visible = cluster_cell.is_displayed() + print(f"Cluster {cluster_name} found in workload list: {is_visible}") + + if not is_visible: + return False + + # Find the parent row + cluster_row = cluster_cell.find_element(By.XPATH, "./ancestor::tr") + + # Find the status cell within the row (PatternFly v6 label structure) + # Try multiple approaches to find the status + status_found = False + status_text = None + + # Approach 1: Look for pf-v6-c-label__text within the row + try: + status_label = cluster_row.find_element( + By.XPATH, + ".//td[@data-label='Status']//span[contains(@class, 'pf-v6-c-label__text')]", + ) + status_text = status_label.text + print(f"Found status (v6 label): {status_text}") + status_found = True + except: + pass + + # Approach 2: Try PatternFly v5 + if not status_found: + try: + status_label = cluster_row.find_element( + By.XPATH, + ".//td[@data-label='Status']//span[contains(@class, 'pf-v5-c-label__text')]", + ) + status_text = status_label.text + print(f"Found status (v5 label): {status_text}") + status_found = True + except: + pass + + # Approach 3: Generic approach - find any text in status cell + if not status_found: + try: + status_cell = cluster_row.find_element( + By.XPATH, ".//td[@data-label='Status']" + ) + status_text = status_cell.text + print(f"Found status (generic): {status_text}") + status_found = True + except: + pass + + if not status_found: + print("Could not find status cell") + return False + + # Check if status is Running or Admitted + if status_text in ["Running", "Admitted"]: + print(f"✓ Cluster {cluster_name} status is {status_text}") + return True + else: + print(f"✗ Cluster {cluster_name} has unexpected status: {status_text}") + return False + + except Exception as e: + print(f"Failed to verify cluster {cluster_name} in workload list: {e}") + import traceback + + traceback.print_exc() + return False diff --git a/tests/e2e_v2/upgrade/01_raycluster_sdk_upgrade_test.py b/tests/e2e_v2/upgrade/01_raycluster_sdk_upgrade_test.py new file mode 100644 index 00000000..78936ffa --- /dev/null +++ b/tests/e2e_v2/upgrade/01_raycluster_sdk_upgrade_test.py @@ -0,0 +1,261 @@ +import pytest +import requests +from time import sleep + +from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication +from codeflare_sdk.ray.client import RayJobClient + +from tests.e2e.support import * +from codeflare_sdk.ray.cluster.cluster import get_cluster + +from codeflare_sdk.common import _kube_api_error_handling + +namespace = "test-ns-rayupgrade" +# Global variables for kueue resources +cluster_queue = "cluster-queue-mnist" +flavor = "default-flavor-mnist" +local_queue = "local-queue-mnist" + + +# Creates a Ray cluster +@pytest.mark.pre_upgrade +class TestMNISTRayClusterApply: + def setup_method(self): + initialize_kubernetes_client(self) + create_namespace_with_name(self, namespace) + try: + create_cluster_queue(self, cluster_queue, flavor) + create_resource_flavor(self, flavor) + create_local_queue(self, cluster_queue, local_queue) + except Exception as e: + delete_namespace(self) + delete_kueue_resources(self) + return _kube_api_error_handling(e) + + @pytest.fixture(autouse=True) + def cleanup_on_failure(self, request): + """Fixture to cleanup namespace and resources if pre-upgrade test fails""" + # This runs after the test + yield + + # Check if the test failed + test_failed = ( + request.node.rep_call.failed if hasattr(request.node, "rep_call") else False + ) + + if test_failed: + print( + f"\n=== Pre-upgrade test failed, cleaning up namespace: {namespace} ===" + ) + try: + delete_namespace(self) + delete_kueue_resources(self) + print(f"Successfully cleaned up namespace: {namespace}") + except Exception as e: + print(f"Warning: Failed to cleanup namespace {namespace}: {e}") + + def test_mnist_ray_cluster_sdk_auth(self): + self.run_mnist_raycluster_sdk_oauth() + + def run_mnist_raycluster_sdk_oauth(self): + ray_image = get_ray_image() + + auth = TokenAuthentication( + token=run_oc_command(["whoami", "--show-token=true"]), + server=run_oc_command(["whoami", "--show-server=true"]), + skip_tls=True, + ) + auth.login() + + cluster = Cluster( + ClusterConfiguration( + name="mnist", + namespace=self.namespace, + num_workers=1, + head_cpu_requests=1, + head_cpu_limits=1, + head_memory_requests=6, + head_memory_limits=8, + worker_cpu_requests=1, + worker_cpu_limits=1, + worker_memory_requests=6, + worker_memory_limits=8, + image=ray_image, + write_to_file=True, + verify_tls=False, + ) + ) + + try: + cluster.apply() + cluster.status() + # wait for raycluster to be Ready + cluster.wait_ready() + cluster.status() + # Check cluster details + cluster.details() + # Assert the cluster status is READY + _, ready = cluster.status() + assert ready + + except Exception as e: + print(f"An unexpected error occurred. Error: ", e) + delete_namespace(self) + assert False, "Cluster is not ready!" + + +@pytest.mark.post_upgrade +class TestMnistJobSubmit: + def setup_method(self): + initialize_kubernetes_client(self) + auth = TokenAuthentication( + token=run_oc_command(["whoami", "--show-token=true"]), + server=run_oc_command(["whoami", "--show-server=true"]), + skip_tls=True, + ) + auth.login() + self.namespace = namespace + self.cluster = get_cluster("mnist", self.namespace) + if not self.cluster: + raise RuntimeError("TestRayClusterUp needs to be run before this test") + + def test_mnist_job_submission(self): + self.assert_jobsubmit_withoutLogin(self.cluster) + self.assert_jobsubmit_withlogin(self.cluster) + + # Assertions + def assert_jobsubmit_withoutLogin(self, cluster): + dashboard_url = cluster.cluster_dashboard_uri() + + # Verify that job submission is actually blocked by attempting to submit without auth + # The endpoint path depends on whether we're using HTTPRoute (with path prefix) or not + if "/ray/" in dashboard_url: + # HTTPRoute format: https://hostname/ray/namespace/cluster-name + # API endpoint is at the same base path + api_url = dashboard_url + "/api/jobs/" + else: + # OpenShift Route format: https://hostname + # API endpoint is directly under the hostname + api_url = dashboard_url + "/api/jobs/" + + jobdata = { + "entrypoint": "python mnist.py", + "runtime_env": { + "working_dir": "./tests/e2e/", + "pip": "./tests/e2e/mnist_pip_requirements.txt", + "env_vars": get_setup_env_variables(), + }, + } + + # Try to submit a job without authentication + # Follow redirects to see the final response - if it redirects to login, that's still a failure + response = requests.post( + api_url, verify=False, json=jobdata, allow_redirects=True + ) + + # Check if the submission was actually blocked + # Success indicators that submission was blocked: + # 1. Status code 403 (Forbidden) + # 2. Status code 302 (Redirect to login) - but we need to verify the final response after redirect + # 3. Status code 200 but with HTML content (login page) instead of JSON (job submission response) + # 4. Status code 401 (Unauthorized) + + submission_blocked = False + + if response.status_code == 403: + submission_blocked = True + elif response.status_code == 401: + submission_blocked = True + elif response.status_code == 302: + # Redirect happened - check if final response after redirect is also a failure + # If we followed redirects, check the final status + submission_blocked = True # Redirect to login means submission failed + elif response.status_code == 200: + # Check if response is HTML (login page) instead of JSON (job submission response) + content_type = response.headers.get("Content-Type", "") + if "text/html" in content_type or "application/json" not in content_type: + # Got HTML (likely login page) instead of JSON - submission was blocked + submission_blocked = True + else: + # Got JSON response - check if it's an error or actually a successful submission + try: + json_response = response.json() + # If it's a successful job submission, it should have a 'job_id' or 'submission_id' + # If it's an error, it might have 'error' or 'message' + if "job_id" in json_response or "submission_id" in json_response: + # Job was actually submitted - this is a failure! + submission_blocked = False + else: + # Error response - submission was blocked + submission_blocked = True + except ValueError: + # Not JSON - likely HTML login page + submission_blocked = True + + if not submission_blocked: + assert ( + False + ), f"Job submission succeeded without authentication! Status: {response.status_code}, Response: {response.text[:200]}" + + # Also verify that RayJobClient cannot be used without authentication + try: + client = RayJobClient(address=dashboard_url, verify=False) + # Try to call a method to trigger the connection and authentication check + client.list_jobs() + assert ( + False + ), "RayJobClient succeeded without authentication - this should not be possible" + except ( + requests.exceptions.JSONDecodeError, + requests.exceptions.HTTPError, + Exception, + ): + # Any exception is expected when trying to use the client without auth + pass + + assert True, "Job submission without authentication was correctly blocked" + + def assert_jobsubmit_withlogin(self, cluster): + auth_token = run_oc_command(["whoami", "--show-token=true"]) + ray_dashboard = cluster.cluster_dashboard_uri() + header = {"Authorization": f"Bearer {auth_token}"} + client = RayJobClient(address=ray_dashboard, headers=header, verify=False) + + # Submit the job + submission_id = client.submit_job( + entrypoint="python mnist.py", + runtime_env={ + "working_dir": "./tests/e2e/", + "pip": "./tests/e2e/mnist_pip_requirements.txt", + "env_vars": get_setup_env_variables(), + }, + ) + print(f"Submitted job with ID: {submission_id}") + done = False + time = 0 + timeout = 900 + while not done: + status = client.get_job_status(submission_id) + if status.is_terminal(): + break + if not done: + print(status) + if timeout and time >= timeout: + raise TimeoutError(f"job has timed out after waiting {timeout}s") + sleep(5) + time += 5 + + logs = client.get_job_logs(submission_id) + print(logs) + + self.assert_job_completion(status) + + client.delete_job(submission_id) + + def assert_job_completion(self, status): + if status == "SUCCEEDED": + print(f"Job has completed: '{status}'") + assert True + else: + print(f"Job has completed: '{status}'") + assert False diff --git a/tests/e2e_v2/upgrade/02_dashboard_ui_upgrade_test.py b/tests/e2e_v2/upgrade/02_dashboard_ui_upgrade_test.py new file mode 100644 index 00000000..39de81b4 --- /dev/null +++ b/tests/e2e_v2/upgrade/02_dashboard_ui_upgrade_test.py @@ -0,0 +1,177 @@ +# Copyright 2024 IBM, Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +import sys +import os + +# Add tests/ui to path to import page objects +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "ui")) + +from pages.distributed_workloads_page import DistributedWorkloadsPage + +# Import cleanup functions +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "e2e")) +from support import ( + delete_namespace, + delete_kueue_resources, + initialize_kubernetes_client, +) + +# Fixtures are imported via conftest.py in this directory + +# Test configuration - should match the cluster created in raycluster_sdk_upgrade_test.py +NAMESPACE = "test-ns-rayupgrade" +CLUSTER_NAME = "mnist" + + +@pytest.mark.pre_upgrade +@pytest.mark.ui +class TestDistributedWorkloadsUIPreUpgrade: + """ + UI tests to verify Ray cluster appears in RHOAI Dashboard before upgrade. + These tests validate that the cluster created by TestMNISTRayClusterApply + is visible and properly displayed in the Workload Metrics UI (under Observe & monitor). + """ + + @pytest.fixture(autouse=True) + def cleanup_on_failure(self, request): + """Fixture to cleanup namespace and resources if pre-upgrade UI test fails""" + # Initialize kubernetes client for cleanup + initialize_kubernetes_client(self) + self.namespace = NAMESPACE + + # This runs after the test + yield + + # Check if the test failed + test_failed = ( + request.node.rep_call.failed if hasattr(request.node, "rep_call") else False + ) + + if test_failed: + print( + f"\n=== Pre-upgrade UI test failed, cleaning up namespace: {NAMESPACE} ===" + ) + try: + delete_namespace(self) + # Note: Kueue resources might have been already cleaned by TestMNISTRayClusterApply + # but we try to clean them again just in case + try: + delete_kueue_resources(self) + except: + pass # May have already been deleted + print(f"Successfully cleaned up namespace: {NAMESPACE}") + except Exception as e: + print(f"Warning: Failed to cleanup namespace {NAMESPACE}: {e}") + + def test_verify_cluster_in_distributed_workloads_ui( + self, selenium_driver, login_to_dashboard + ): + """ + Verify that the Ray cluster is visible in the Workload Metrics UI + and shows correct status and metrics before upgrade. + """ + driver = selenium_driver + dw_page = DistributedWorkloadsPage(driver) + + # Navigate to Workload Metrics page (under Observe & monitor) + print("\n=== Navigating to Workload Metrics page ===") + dw_page.navigate() + + # Select the project + print(f"\n=== Selecting project: {NAMESPACE} ===") + dw_page.select_project(NAMESPACE) + + # Verify cluster is Running or Admitted + # (needs to be clarified with dw team - in the past the status was "Running") + print("\n=== Verifying cluster is in Running or Admitted state ===") + assert ( + dw_page.verify_cluster_running() + ), f"Cluster in {NAMESPACE} should be in Running or Admitted state before upgrade" + + # Click Project Metrics tab and verify metrics are visible + print("\n=== Checking Project Metrics tab ===") + dw_page.click_project_metrics_tab() + assert ( + dw_page.verify_metrics_visible() + ), "Resource metrics should be visible on Project Metrics tab" + + # Click Workload Status tab and verify cluster appears in the list + print("\n=== Checking Workload Status tab ===") + dw_page.click_workload_status_tab() + assert dw_page.verify_cluster_in_workload_list( + CLUSTER_NAME + ), f"Cluster '{CLUSTER_NAME}' should appear in workload list with Running or Admitted status" + + print("\n=== Pre-upgrade UI verification completed successfully ===") + + +@pytest.mark.post_upgrade +@pytest.mark.ui +class TestDistributedWorkloadsUIPostUpgrade: + """ + UI tests to verify Ray cluster persists in RHOAI Dashboard after upgrade. + These tests validate that the cluster created before the upgrade is still + visible and functional in the Workload Metrics UI (under Observe & monitor) after the upgrade completes. + """ + + def test_verify_cluster_persists_after_upgrade( + self, selenium_driver, login_to_dashboard + ): + """ + Verify that the Ray cluster is still visible in the Workload Metrics UI + and shows correct status and metrics after upgrade. + + This test performs the same verifications as the pre-upgrade test to ensure + the cluster survived the upgrade process. + """ + driver = selenium_driver + dw_page = DistributedWorkloadsPage(driver) + + # Navigate to Workload Metrics page (under Observe & monitor) + print("\n=== Navigating to Workload Metrics page ===") + dw_page.navigate() + + # Select the project + print(f"\n=== Selecting project: {NAMESPACE} ===") + dw_page.select_project(NAMESPACE) + + # Verify cluster is still Running or Admitted after upgrade + # (needs to be clarified with dw team - in the past the status was "Running") + print( + "\n=== Verifying cluster is still in Running or Admitted state after upgrade ===" + ) + assert ( + dw_page.verify_cluster_running() + ), f"Cluster in {NAMESPACE} should still be in Running or Admitted state after upgrade" + + # Click Project Metrics tab and verify metrics are still accessible + print("\n=== Checking Project Metrics tab ===") + dw_page.click_project_metrics_tab() + assert ( + dw_page.verify_metrics_visible() + ), "Resource metrics should still be visible on Project Metrics tab after upgrade" + + # Click Workload Status tab and verify cluster still appears in the list + print("\n=== Checking Workload Status tab ===") + dw_page.click_workload_status_tab() + assert dw_page.verify_cluster_in_workload_list( + CLUSTER_NAME + ), f"Cluster '{CLUSTER_NAME}' should still appear in workload list with Running or Admitted status after upgrade" + + print("\n=== Post-upgrade UI verification completed successfully ===") + print( + "The cluster has successfully persisted through the upgrade and remains functional." + ) diff --git a/tests/e2e_v2/upgrade/__init__.py b/tests/e2e_v2/upgrade/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e_v2/upgrade/conftest.py b/tests/e2e_v2/upgrade/conftest.py new file mode 100644 index 00000000..4d3a6e0c --- /dev/null +++ b/tests/e2e_v2/upgrade/conftest.py @@ -0,0 +1,48 @@ +# Copyright 2024 IBM, Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Conftest for upgrade tests - imports UI fixtures for dashboard tests +""" + +import sys +import os +import pytest + +# Add parent test directory to path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +# Import all fixtures from ui/conftest.py +from ui.conftest import ( + selenium_driver, + dashboard_url, + test_credentials, + login_to_dashboard, +) + +__all__ = ["selenium_driver", "dashboard_url", "test_credentials", "login_to_dashboard"] + + +# Hook to capture test results for teardown methods +@pytest.hookimpl(tryfirst=True, hookwrapper=True) +def pytest_runtest_makereport(item, call): + """ + Hook to capture test results and make them available to teardown methods. + This allows teardown_method to check if the test failed. + """ + outcome = yield + rep = outcome.get_result() + + # Store the result in the item so teardown can access it + setattr(item, f"rep_{rep.when}", rep) diff --git a/tests/e2e_v2/utils/__init__.py b/tests/e2e_v2/utils/__init__.py new file mode 100644 index 00000000..3d5ce82f --- /dev/null +++ b/tests/e2e_v2/utils/__init__.py @@ -0,0 +1 @@ +# Utility modules for E2E tests diff --git a/tests/e2e_v2/utils/helpers.py b/tests/e2e_v2/utils/helpers.py new file mode 100644 index 00000000..dfa1d6c1 --- /dev/null +++ b/tests/e2e_v2/utils/helpers.py @@ -0,0 +1,369 @@ +""" +General test helpers for E2E tests. + +This module re-exports helper functions from the original tests/e2e/support.py +and provides additional utilities for the e2e_v2 test suite. +""" + +import random +import string +from time import sleep +from typing import Optional, Callable +from kubernetes import client + +# ============================================================================= +# Re-export from original e2e/support.py +# These functions don't use `self` so they can be imported directly +# ============================================================================= + +from tests.e2e.support import ( + random_choice, + wait_for_job_status, + verify_rayjob_cluster_cleanup, +) + + +# ============================================================================= +# Random Generators +# ============================================================================= + + +def random_suffix(length: int = 5) -> str: + """ + Generate a random alphanumeric suffix. + + This is an alias for random_choice() from e2e/support.py for + backwards compatibility with code that uses this name. + + Args: + length: Length of the suffix. + + Returns: + Random suffix string. + """ + alphabet = string.ascii_lowercase + string.digits + return "".join(random.choices(alphabet, k=length)) + + +# ============================================================================= +# RayJob Utilities +# ============================================================================= + + +def get_rayjob_api(): + """Get a RayjobApi instance.""" + from codeflare_sdk.vendored.python_client.kuberay_job_api import RayjobApi + + return RayjobApi() + + +def wait_for_job_finished( + job_name: str, + namespace: str, + timeout: int = 600, +) -> bool: + """ + Wait for a RayJob to finish (succeed or fail). + + Args: + job_name: Name of the RayJob. + namespace: Namespace of the RayJob. + timeout: Maximum time to wait in seconds. + + Returns: + True if job finished, False if timeout. + """ + job_api = get_rayjob_api() + return job_api.wait_until_job_finished( + name=job_name, + k8s_namespace=namespace, + timeout=timeout, + ) + + +def get_job_status(job_name: str, namespace: str) -> dict: + """ + Get the status of a RayJob. + + Args: + job_name: Name of the RayJob. + namespace: Namespace of the RayJob. + + Returns: + Job status dictionary. + """ + job_api = get_rayjob_api() + return job_api.get_job_status( + name=job_name, + k8s_namespace=namespace, + ) + + +# ============================================================================= +# Wait Functions +# ============================================================================= + + +def wait_for_condition( + condition_fn: Callable[[], bool], + timeout: int = 300, + interval: int = 5, + message: str = "Waiting for condition", +) -> bool: + """ + Wait for a condition to become true. + + Args: + condition_fn: Function that returns True when condition is met. + timeout: Maximum time to wait in seconds. + interval: Time between checks in seconds. + message: Message to print while waiting. + + Returns: + True if condition met, False if timeout. + """ + elapsed = 0 + while elapsed < timeout: + if condition_fn(): + return True + print(f"{message}... ({elapsed}s/{timeout}s)") + sleep(interval) + elapsed += interval + return False + + +def wait_for_rayjob_completion( + job_api, + job_name: str, + namespace: str, + timeout: int = 600, +) -> bool: + """ + Wait for a RayJob to complete (succeed or fail). + + Args: + job_api: RayjobApi instance. + job_name: Name of the RayJob. + namespace: Namespace of the RayJob. + timeout: Maximum time to wait in seconds. + + Returns: + True if completed successfully, False otherwise. + """ + elapsed = 0 + interval = 10 + + while elapsed < timeout: + try: + status = job_api.get_job_status(name=job_name, k8s_namespace=namespace) + if not status: + sleep(interval) + elapsed += interval + continue + + # Check jobDeploymentStatus (the actual field in the status dict) + deployment_status = status.get("jobDeploymentStatus", "") + + # "Complete" means the job succeeded + if deployment_status == "Complete": + return True + elif deployment_status in ["Failed", "Suspended"]: + message = status.get("message", "No error message") + print( + f"RayJob '{job_name}' failed with status: {deployment_status}. Message: {message}" + ) + return False + except Exception as e: + print(f"Error checking job status: {e}") + + sleep(interval) + elapsed += interval + + print(f"Timeout waiting for RayJob '{job_name}' completion") + return False + + +# ============================================================================= +# Pod Utilities +# ============================================================================= + + +def wait_for_pod_ready( + api_instance: client.CoreV1Api, + namespace: str, + pod_name: str, + timeout: int = 300, +) -> bool: + """ + Wait for a pod to be ready. + + Args: + api_instance: Kubernetes CoreV1Api instance. + namespace: Namespace of the pod. + pod_name: Name of the pod. + timeout: Maximum time to wait in seconds. + + Returns: + True if pod is ready, False if timeout. + """ + elapsed = 0 + interval = 5 + + while elapsed < timeout: + try: + pod = api_instance.read_namespaced_pod(pod_name, namespace) + if pod.status.phase == "Running": + # Check all containers are ready + if pod.status.container_statuses: + all_ready = all(cs.ready for cs in pod.status.container_statuses) + if all_ready: + return True + except client.exceptions.ApiException: + pass + + sleep(interval) + elapsed += interval + + return False + + +def wait_for_pod_completion_phase( + api_instance: client.CoreV1Api, + namespace: str, + pod_name: str, + timeout: int = 600, +) -> Optional[str]: + """ + Wait for a pod to complete and return its phase. + + Note: This is different from pod_execution.wait_for_pod_completion + which returns a PodExecutionResult. This just returns the phase string. + + Args: + api_instance: Kubernetes CoreV1Api instance. + namespace: Namespace of the pod. + pod_name: Name of the pod. + timeout: Maximum time to wait in seconds. + + Returns: + Pod phase ('Succeeded' or 'Failed') or None if timeout. + """ + elapsed = 0 + interval = 5 + + while elapsed < timeout: + try: + pod = api_instance.read_namespaced_pod(pod_name, namespace) + if pod.status.phase in ["Succeeded", "Failed"]: + return pod.status.phase + except client.exceptions.ApiException: + pass + + sleep(interval) + elapsed += interval + + return None + + +def get_pod_logs( + api_instance: client.CoreV1Api, + namespace: str, + pod_name: str, + container: Optional[str] = None, +) -> str: + """ + Get logs from a pod. + + Args: + api_instance: Kubernetes CoreV1Api instance. + namespace: Namespace of the pod. + pod_name: Name of the pod. + container: Optional container name. + + Returns: + Pod logs as string. + """ + try: + return api_instance.read_namespaced_pod_log( + pod_name, + namespace, + container=container, + ) + except client.exceptions.ApiException as e: + return f"Error getting logs: {e}" + + +# ============================================================================= +# Assertion Helpers +# ============================================================================= + + +def assert_job_succeeded(status: dict, job_name: str = None) -> None: + """ + Assert that a RayJob succeeded based on its status dict. + + Handles both jobStatus and jobDeploymentStatus fields. + + Args: + status: Job status dictionary from get_job_status(). + job_name: Optional job name for error messages. + + Raises: + AssertionError: If job did not succeed. + """ + job_status = status.get("jobStatus") + deployment_status = status.get("jobDeploymentStatus") + name_prefix = f"Job '{job_name}'" if job_name else "Job" + + if job_status: + assert ( + job_status == "SUCCEEDED" + ), f"{name_prefix} did not succeed. Status: {job_status}. Full: {status}" + elif deployment_status: + assert ( + deployment_status == "Complete" + ), f"{name_prefix} did not complete. Status: {deployment_status}. Full: {status}" + else: + raise AssertionError(f"Could not determine job status. Full: {status}") + + +def assert_cluster_ready(cluster) -> None: + """ + Assert that a cluster is ready. + + Args: + cluster: Cluster object. + + Raises: + AssertionError: If cluster is not ready. + """ + status = cluster.status() + assert status is not None, "Cluster status is None" + + +# ============================================================================= +# Export all functions +# ============================================================================= + +__all__ = [ + # Random generators + "random_choice", + "random_suffix", + # RayJob utilities + "get_rayjob_api", + "wait_for_job_finished", + "get_job_status", + # Wait functions (re-exported from e2e/support.py) + "wait_for_job_status", + "verify_rayjob_cluster_cleanup", + # Wait functions (new) + "wait_for_condition", + "wait_for_rayjob_completion", + # Pod utilities + "wait_for_pod_ready", + "wait_for_pod_completion_phase", + "get_pod_logs", + # Assertion helpers + "assert_job_succeeded", + "assert_cluster_ready", +] diff --git a/tests/e2e_v2/utils/in_cluster/__init__.py b/tests/e2e_v2/utils/in_cluster/__init__.py new file mode 100644 index 00000000..361bd2e2 --- /dev/null +++ b/tests/e2e_v2/utils/in_cluster/__init__.py @@ -0,0 +1,43 @@ +""" +In-cluster test execution utilities. + +This package provides functions for setting up and managing test execution +inside Kubernetes pods, including RBAC setup, service account management, +and pod execution. +""" + +from .rbac import ( + create_test_service_account, + create_rayjob_rbac, + delete_test_service_account, +) +from .setup import ( + setup_in_cluster_test_environment, + cleanup_in_cluster_test_environment, +) +from .pod_execution import ( + PodExecutionResult, + create_test_pod, + create_sdk_test_pod, + run_code_in_pod, + wait_for_pod_completion, + get_pod_logs, + delete_test_pod, + cleanup_test_pods, +) + +__all__ = [ + "create_test_service_account", + "create_rayjob_rbac", + "delete_test_service_account", + "setup_in_cluster_test_environment", + "cleanup_in_cluster_test_environment", + "PodExecutionResult", + "create_test_pod", + "create_sdk_test_pod", + "run_code_in_pod", + "wait_for_pod_completion", + "get_pod_logs", + "delete_test_pod", + "cleanup_test_pods", +] diff --git a/tests/e2e_v2/utils/in_cluster/pod_execution.py b/tests/e2e_v2/utils/in_cluster/pod_execution.py new file mode 100644 index 00000000..39f0f816 --- /dev/null +++ b/tests/e2e_v2/utils/in_cluster/pod_execution.py @@ -0,0 +1,482 @@ +""" +Utilities for in-cluster pod execution. + +This module provides functions for creating pods and running code inside +the cluster to test in-cluster execution scenarios (like Jupyter notebooks). +""" + +from dataclasses import dataclass +from time import sleep +from typing import Optional, Dict, List +from kubernetes import client + +from ..helpers import random_suffix + + +@dataclass +class PodExecutionResult: + """Result of executing code in a pod.""" + + pod_name: str + namespace: str + phase: str # 'Succeeded' or 'Failed' + logs: str + exit_code: Optional[int] = None + + @property + def succeeded(self) -> bool: + """Check if the pod execution succeeded.""" + return self.phase == "Succeeded" + + +def _create_script_configmap( + api_instance: client.CoreV1Api, + namespace: str, + configmap_name: str, + code: str, +) -> None: + """ + Create a ConfigMap containing the Python script to execute. + + Args: + api_instance: Kubernetes CoreV1Api instance. + namespace: Namespace to create the ConfigMap in. + configmap_name: Name of the ConfigMap. + code: Python code to store in the ConfigMap. + """ + configmap = client.V1ConfigMap( + metadata=client.V1ObjectMeta( + name=configmap_name, + namespace=namespace, + labels={ + "app.kubernetes.io/managed-by": "codeflare-sdk-tests", + "codeflare-sdk-test/type": "script-configmap", + }, + ), + data={"script.py": code}, + ) + + api_instance.create_namespaced_config_map(namespace, configmap) + + +def _delete_script_configmap( + api_instance: client.CoreV1Api, + namespace: str, + configmap_name: str, +) -> None: + """Delete a script ConfigMap.""" + try: + api_instance.delete_namespaced_config_map(configmap_name, namespace) + except client.exceptions.ApiException as e: + if e.status != 404: + raise + + +def create_test_pod( + api_instance: client.CoreV1Api, + namespace: str, + code: str, + name_prefix: str = "test-pod", + image: str = "python:3.12-slim", + env_vars: Optional[Dict[str, str]] = None, + pip_packages: Optional[List[str]] = None, + service_account: Optional[str] = None, + timeout: int = 600, +) -> str: + """ + Create a pod that runs Python code in the cluster. + + This is used to test in-cluster execution scenarios where code runs + inside a pod (like a Jupyter notebook) rather than from outside. + + The code is stored in a ConfigMap and mounted into the pod to avoid + shell escaping issues. + + Args: + api_instance: Kubernetes CoreV1Api instance. + namespace: Namespace to create the pod in. + code: Python code to execute. + name_prefix: Prefix for the pod name. + image: Container image to use. + env_vars: Optional environment variables. + pip_packages: Optional pip packages to install. + service_account: Optional service account name. + timeout: Timeout for the pod in seconds. + + Returns: + The pod name. + """ + pod_name = f"{name_prefix}-{random_suffix()}" + configmap_name = f"{pod_name}-script" + + # Create ConfigMap with the script + _create_script_configmap(api_instance, namespace, configmap_name, code) + + pip_install = "" + if pip_packages: + packages = " ".join(pip_packages) + pip_install = f'mkdir -p /tmp/pip-packages && export PYTHONPATH=/tmp/pip-packages:$PYTHONPATH && (python3 -m pip install --quiet --target /tmp/pip-packages {packages} 2>&1 | grep -v "WARNING.*dependency conflicts" | grep -v "ERROR: pip.*dependency resolver" || python3 -m pip install --quiet --target /tmp/pip-packages {packages} 2>&1 | grep -v "WARNING.*dependency conflicts" | grep -v "ERROR: pip.*dependency resolver") || (python -m pip install --quiet --target /tmp/pip-packages {packages} 2>&1 | grep -v "WARNING.*dependency conflicts" | grep -v "ERROR: pip.*dependency resolver" || python -m pip install --quiet --target /tmp/pip-packages {packages} 2>&1 | grep -v "WARNING.*dependency conflicts" | grep -v "ERROR: pip.*dependency resolver") && ' + + pythonpath_env = "" + if pip_packages: + pythonpath_env = "export PYTHONPATH=/tmp/pip-packages:$PYTHONPATH && " + + command = [ + "/bin/sh", + "-c", + f"{pip_install}{pythonpath_env}python3 /scripts/script.py 2>/dev/null || {pythonpath_env}python /scripts/script.py", + ] + + # Build environment variables + env = [] + for key, value in (env_vars or {}).items(): + env.append(client.V1EnvVar(name=key, value=value)) + + # Create volume mount for the script + volume_mounts = [ + client.V1VolumeMount( + name="script-volume", + mount_path="/scripts", + read_only=True, + ) + ] + + # Create volume from ConfigMap + volumes = [ + client.V1Volume( + name="script-volume", + config_map=client.V1ConfigMapVolumeSource( + name=configmap_name, + ), + ) + ] + + # Create the pod spec + pod_body = client.V1Pod( + metadata=client.V1ObjectMeta( + name=pod_name, + namespace=namespace, + labels={ + "app.kubernetes.io/name": pod_name, + "app.kubernetes.io/managed-by": "codeflare-sdk-tests", + "codeflare-sdk-test/type": "in-cluster-execution", + }, + annotations={ + "codeflare-sdk-test/configmap": configmap_name, + }, + ), + spec=client.V1PodSpec( + containers=[ + client.V1Container( + name="test", + image=image, + command=command, + env=env if env else None, + volume_mounts=volume_mounts, + resources=client.V1ResourceRequirements( + requests={"cpu": "100m", "memory": "256Mi"}, + limits={"cpu": "500m", "memory": "512Mi"}, + ), + ) + ], + volumes=volumes, + restart_policy="Never", + service_account_name=service_account, + active_deadline_seconds=timeout, + ), + ) + + api_instance.create_namespaced_pod(namespace, pod_body) + return pod_name + + +def create_sdk_test_pod( + api_instance: client.CoreV1Api, + namespace: str, + code: str, + name_prefix: str = "sdk-test", + ray_image: Optional[str] = None, + env_vars: Optional[Dict[str, str]] = None, + service_account: Optional[str] = None, + timeout: int = 600, +) -> str: + """ + Create a pod that runs CodeFlare SDK code in the cluster. + + This pod has the SDK and Ray installed, suitable for testing + ray.init() and other SDK functionality from inside the cluster. + + Args: + api_instance: Kubernetes CoreV1Api instance. + namespace: Namespace to create the pod in. + code: Python code to execute (should import codeflare_sdk/ray). + name_prefix: Prefix for the pod name. + ray_image: Ray image to use (defaults to standard Ray image). + env_vars: Optional environment variables. + service_account: Optional service account name. + timeout: Timeout for the pod in seconds. + + Returns: + The pod name. + """ + from codeflare_sdk.common.utils import constants + + if ray_image is None: + ray_image = f"rayproject/ray:{constants.RAY_VERSION}" + + pip_packages = ["codeflare-sdk"] + + return create_test_pod( + api_instance=api_instance, + namespace=namespace, + code=code, + name_prefix=name_prefix, + image=ray_image, + env_vars=env_vars, + pip_packages=pip_packages, + service_account=service_account, + timeout=timeout, + ) + + +def run_code_in_pod( + api_instance: client.CoreV1Api, + namespace: str, + code: str, + image: str = "python:3.12-slim", + env_vars: Optional[Dict[str, str]] = None, + pip_packages: Optional[List[str]] = None, + service_account: Optional[str] = None, + timeout: int = 600, + cleanup: bool = True, + auto_setup_rbac: bool = False, + custom_api: Optional[client.CustomObjectsApi] = None, +) -> PodExecutionResult: + """ + Run Python code in a pod and wait for completion. + + This is the main function for in-cluster execution testing. + It creates a pod, runs the code, waits for completion, and returns results. + + Args: + api_instance: Kubernetes CoreV1Api instance. + namespace: Namespace to create the pod in. + code: Python code to execute. + image: Container image to use. + env_vars: Optional environment variables. + pip_packages: Optional pip packages to install. + service_account: Optional service account name. If not provided and + auto_setup_rbac=True, a service account with RBAC will be created. + timeout: Timeout for the pod in seconds. + cleanup: Whether to delete the pod after completion. + auto_setup_rbac: If True and no service_account provided, automatically + create a service account with RBAC permissions for RayJob operations. + custom_api: CustomObjectsApi instance (required if auto_setup_rbac=True). + + Returns: + PodExecutionResult with logs and status. + """ + from .setup import ( + setup_in_cluster_test_environment, + cleanup_in_cluster_test_environment, + ) + + auto_created_sa = None + if auto_setup_rbac and not service_account: + if custom_api is None: + raise ValueError("custom_api is required when auto_setup_rbac=True") + auto_created_sa = setup_in_cluster_test_environment( + api_instance=api_instance, + custom_api=custom_api, + namespace=namespace, + name_prefix="test-pod", + ) + service_account = auto_created_sa + + try: + pod_name = create_test_pod( + api_instance=api_instance, + namespace=namespace, + code=code, + image=image, + env_vars=env_vars, + pip_packages=pip_packages, + service_account=service_account, + timeout=timeout, + ) + + result = wait_for_pod_completion(api_instance, namespace, pod_name, timeout) + + if cleanup: + try: + delete_test_pod(api_instance, namespace, pod_name) + except Exception: + pass + + return result + finally: + if auto_created_sa and cleanup and custom_api is not None: + try: + cleanup_in_cluster_test_environment( + api_instance=api_instance, + custom_api=custom_api, + namespace=namespace, + service_account_name=auto_created_sa, + ) + except Exception: + pass + + +def wait_for_pod_completion( + api_instance: client.CoreV1Api, + namespace: str, + pod_name: str, + timeout: int = 600, +) -> PodExecutionResult: + """ + Wait for a test pod to complete and return results. + + Args: + api_instance: Kubernetes CoreV1Api instance. + namespace: Namespace of the pod. + pod_name: Name of the pod. + timeout: Maximum time to wait in seconds. + + Returns: + PodExecutionResult with logs and status. + """ + elapsed = 0 + interval = 5 + + while elapsed < timeout: + try: + pod = api_instance.read_namespaced_pod(pod_name, namespace) + phase = pod.status.phase + + if phase in ["Succeeded", "Failed"]: + logs = get_pod_logs(api_instance, namespace, pod_name) + exit_code = None + if pod.status.container_statuses: + cs = pod.status.container_statuses[0] + if cs.state.terminated: + exit_code = cs.state.terminated.exit_code + + return PodExecutionResult( + pod_name=pod_name, + namespace=namespace, + phase=phase, + logs=logs, + exit_code=exit_code, + ) + except client.exceptions.ApiException: + pass + + sleep(interval) + elapsed += interval + + logs = get_pod_logs(api_instance, namespace, pod_name) + return PodExecutionResult( + pod_name=pod_name, + namespace=namespace, + phase="Timeout", + logs=logs, + exit_code=None, + ) + + +def get_pod_logs( + api_instance: client.CoreV1Api, + namespace: str, + pod_name: str, + container: Optional[str] = None, +) -> str: + """ + Get logs from a test pod. + + Args: + api_instance: Kubernetes CoreV1Api instance. + namespace: Namespace of the pod. + pod_name: Name of the pod. + container: Optional container name. + + Returns: + Pod logs as string. + """ + try: + return api_instance.read_namespaced_pod_log( + pod_name, + namespace, + container=container, + ) + except client.exceptions.ApiException as e: + return f"Error getting logs: {e}" + + +def delete_test_pod( + api_instance: client.CoreV1Api, + namespace: str, + pod_name: str, +) -> None: + """ + Delete a test pod and its associated ConfigMap. + + Args: + api_instance: Kubernetes CoreV1Api instance. + namespace: Namespace of the pod. + pod_name: Name of the pod. + """ + configmap_name = None + try: + pod = api_instance.read_namespaced_pod(pod_name, namespace) + annotations = pod.metadata.annotations or {} + configmap_name = annotations.get("codeflare-sdk-test/configmap") + except client.exceptions.ApiException: + pass + + try: + api_instance.delete_namespaced_pod( + pod_name, + namespace, + body=client.V1DeleteOptions( + grace_period_seconds=0, + propagation_policy="Background", + ), + ) + except client.exceptions.ApiException as e: + if e.status != 404: + raise + + if configmap_name: + _delete_script_configmap(api_instance, namespace, configmap_name) + + +def cleanup_test_pods( + api_instance: client.CoreV1Api, + namespace: str, +) -> None: + """ + Clean up all test pods and their ConfigMaps in a namespace. + + Args: + api_instance: Kubernetes CoreV1Api instance. + namespace: Namespace to clean up. + """ + try: + pods = api_instance.list_namespaced_pod( + namespace, + label_selector="codeflare-sdk-test/type=in-cluster-execution", + ) + + for pod in pods.items: + delete_test_pod(api_instance, namespace, pod.metadata.name) + + configmaps = api_instance.list_namespaced_config_map( + namespace, + label_selector="codeflare-sdk-test/type=script-configmap", + ) + + for cm in configmaps.items: + _delete_script_configmap(api_instance, namespace, cm.metadata.name) + + except client.exceptions.ApiException: + pass diff --git a/tests/e2e_v2/utils/in_cluster/rbac.py b/tests/e2e_v2/utils/in_cluster/rbac.py new file mode 100644 index 00000000..db0f65df --- /dev/null +++ b/tests/e2e_v2/utils/in_cluster/rbac.py @@ -0,0 +1,200 @@ +""" +RBAC utilities for in-cluster test execution. + +This module provides functions to create and manage service accounts with +proper RBAC permissions for running tests inside Kubernetes pods. +""" + +from kubernetes import client +from kubernetes.client import ( + V1ServiceAccount, + V1Role, + V1RoleBinding, + V1ObjectMeta, + V1PolicyRule, +) +from ..helpers import random_suffix + + +def create_test_service_account( + api_instance: client.CoreV1Api, + namespace: str, + name_prefix: str = "test-sa", +) -> str: + """ + Create a ServiceAccount in the specified namespace. + + Args: + api_instance: Kubernetes CoreV1Api instance. + namespace: Namespace to create the service account in. + name_prefix: Prefix for the service account name. + + Returns: + The service account name. + """ + service_account_name = f"{name_prefix}-{random_suffix()}" + + service_account = V1ServiceAccount( + metadata=V1ObjectMeta( + name=service_account_name, + namespace=namespace, + labels={ + "app.kubernetes.io/managed-by": "codeflare-sdk-tests", + "codeflare-sdk-test/type": "in-cluster-execution", + }, + ) + ) + + api_instance.create_namespaced_service_account(namespace, service_account) + return service_account_name + + +def create_rayjob_rbac( + api_instance: client.CoreV1Api, + custom_api: client.CustomObjectsApi, + namespace: str, + service_account_name: str, +) -> tuple[str, str]: + """ + Create a Role and RoleBinding with permissions for RayJob operations. + + This creates: + - A Role with permissions to create/manage RayJobs, RayClusters, and related resources + - A RoleBinding linking the Role to the ServiceAccount + + Args: + api_instance: Kubernetes CoreV1Api instance (for Role/RoleBinding). + custom_api: CustomObjectsApi instance (for custom resources). + namespace: Namespace to create resources in. + service_account_name: Name of the service account to bind permissions to. + + Returns: + Tuple of (role_name, role_binding_name). + + Raises: + Exception: If creation fails, attempts to clean up created resources. + """ + rbac_api = client.RbacAuthorizationV1Api() + + role_name = f"{service_account_name}-role" + role_binding_name = f"{service_account_name}-rolebinding" + + try: + role = V1Role( + metadata=V1ObjectMeta( + name=role_name, + namespace=namespace, + labels={ + "app.kubernetes.io/managed-by": "codeflare-sdk-tests", + "codeflare-sdk-test/type": "in-cluster-execution", + }, + ), + rules=[ + V1PolicyRule( + api_groups=["ray.io"], + resources=["rayjobs"], + verbs=[ + "create", + "get", + "list", + "watch", + "update", + "patch", + "delete", + ], + ), + V1PolicyRule( + api_groups=["ray.io"], + resources=["rayjobs/status"], + verbs=["get", "list", "watch"], + ), + V1PolicyRule( + api_groups=["ray.io"], + resources=["rayclusters"], + verbs=["get", "list", "watch"], + ), + V1PolicyRule( + api_groups=[""], + resources=["pods", "configmaps", "secrets"], + verbs=["get", "list", "watch"], + ), + ], + ) + + rbac_api.create_namespaced_role(namespace, role) + + role_binding = V1RoleBinding( + metadata=V1ObjectMeta( + name=role_binding_name, + namespace=namespace, + labels={ + "app.kubernetes.io/managed-by": "codeflare-sdk-tests", + "codeflare-sdk-test/type": "in-cluster-execution", + }, + ), + subjects=[ + { + "kind": "ServiceAccount", + "name": service_account_name, + "namespace": namespace, + } + ], + role_ref={ + "apiGroup": "rbac.authorization.k8s.io", + "kind": "Role", + "name": role_name, + }, + ) + + rbac_api.create_namespaced_role_binding(namespace, role_binding) + return role_name, role_binding_name + + except Exception as e: + try: + delete_test_service_account( + api_instance, custom_api, namespace, service_account_name + ) + except Exception: + pass + raise + + +def delete_test_service_account( + api_instance: client.CoreV1Api, + custom_api: client.CustomObjectsApi, + namespace: str, + service_account_name: str, +) -> None: + """ + Delete ServiceAccount, Role, and RoleBinding. + + Handles missing resources gracefully (e.g., if already deleted). + + Args: + api_instance: Kubernetes CoreV1Api instance. + custom_api: CustomObjectsApi instance (unused but kept for API consistency). + namespace: Namespace where resources exist. + service_account_name: Name of the service account to delete. + """ + rbac_api = client.RbacAuthorizationV1Api() + + role_name = f"{service_account_name}-role" + role_binding_name = f"{service_account_name}-rolebinding" + + try: + rbac_api.delete_namespaced_role_binding(role_binding_name, namespace) + except client.exceptions.ApiException as e: + if e.status != 404: + raise + + try: + rbac_api.delete_namespaced_role(role_name, namespace) + except client.exceptions.ApiException as e: + if e.status != 404: + raise + + try: + api_instance.delete_namespaced_service_account(service_account_name, namespace) + except client.exceptions.ApiException as e: + if e.status != 404: + raise diff --git a/tests/e2e_v2/utils/in_cluster/setup.py b/tests/e2e_v2/utils/in_cluster/setup.py new file mode 100644 index 00000000..c8acee70 --- /dev/null +++ b/tests/e2e_v2/utils/in_cluster/setup.py @@ -0,0 +1,84 @@ +""" +High-level setup and cleanup functions for in-cluster test execution. + +This module provides convenient functions that combine service account creation +and RBAC setup for easy use in test setup/teardown methods. +""" + +from kubernetes import client +from .rbac import ( + create_test_service_account, + create_rayjob_rbac, + delete_test_service_account, +) + + +def setup_in_cluster_test_environment( + api_instance: client.CoreV1Api, + custom_api: client.CustomObjectsApi, + namespace: str, + name_prefix: str = "test-pod", +) -> str: + """ + Set up a complete in-cluster test environment with service account and RBAC. + + This function: + 1. Creates a ServiceAccount + 2. Creates a Role with permissions for RayJob operations + 3. Creates a RoleBinding linking the Role to the ServiceAccount + + Args: + api_instance: Kubernetes CoreV1Api instance. + custom_api: CustomObjectsApi instance. + namespace: Namespace to create resources in. + name_prefix: Prefix for resource names. + + Returns: + The service account name to use in pod creation. + """ + service_account_name = create_test_service_account( + api_instance=api_instance, + namespace=namespace, + name_prefix=name_prefix, + ) + + try: + create_rayjob_rbac( + api_instance=api_instance, + custom_api=custom_api, + namespace=namespace, + service_account_name=service_account_name, + ) + except Exception: + try: + api_instance.delete_namespaced_service_account( + service_account_name, namespace + ) + except Exception: + pass + raise + + return service_account_name + + +def cleanup_in_cluster_test_environment( + api_instance: client.CoreV1Api, + custom_api: client.CustomObjectsApi, + namespace: str, + service_account_name: str, +) -> None: + """ + Clean up in-cluster test environment (ServiceAccount, Role, RoleBinding). + + Args: + api_instance: Kubernetes CoreV1Api instance. + custom_api: CustomObjectsApi instance. + namespace: Namespace where resources exist. + service_account_name: Name of the service account to clean up. + """ + delete_test_service_account( + api_instance=api_instance, + custom_api=custom_api, + namespace=namespace, + service_account_name=service_account_name, + ) diff --git a/tests/e2e_v2/utils/kueue.py b/tests/e2e_v2/utils/kueue.py new file mode 100644 index 00000000..d6314757 --- /dev/null +++ b/tests/e2e_v2/utils/kueue.py @@ -0,0 +1,393 @@ +""" +Kueue-specific utilities for E2E tests. + +This module re-exports Kueue functions from the original tests/e2e/support.py +and provides additional wrapper functions with cleaner APIs. +""" + +from typing import List, Optional, Dict, Any +from kubernetes import client + +# ============================================================================= +# Re-export from original e2e/support.py +# ============================================================================= + +from tests.e2e.support import ( + get_kueue_workload_for_job as _get_kueue_workload_for_job, + wait_for_kueue_admission as _wait_for_kueue_admission, + create_limited_kueue_resources as _create_limited_kueue_resources, +) + + +# ============================================================================= +# Wrapper functions with cleaner APIs (no self parameter) +# ============================================================================= + + +def create_resource_flavor( + custom_api: client.CustomObjectsApi, + flavor_name: str, + node_labels: Optional[Dict[str, str]] = None, + tolerations: Optional[List[Dict[str, Any]]] = None, +) -> None: + """ + Create a Kueue ResourceFlavor. + + Args: + custom_api: Kubernetes CustomObjectsApi instance. + flavor_name: Name of the ResourceFlavor to create. + node_labels: Optional node labels for the flavor. + tolerations: Optional tolerations for the flavor. + """ + resource_flavor_body = { + "apiVersion": "kueue.x-k8s.io/v1beta1", + "kind": "ResourceFlavor", + "metadata": {"name": flavor_name}, + "spec": { + "nodeLabels": node_labels or {}, + }, + } + + if tolerations: + resource_flavor_body["spec"]["tolerations"] = tolerations + + try: + custom_api.get_cluster_custom_object( + group="kueue.x-k8s.io", + plural="resourceflavors", + version="v1beta1", + name=flavor_name, + ) + print(f"ResourceFlavor '{flavor_name}' already exists") + except client.exceptions.ApiException as e: + if e.status == 404: + custom_api.create_cluster_custom_object( + group="kueue.x-k8s.io", + plural="resourceflavors", + version="v1beta1", + body=resource_flavor_body, + ) + print(f"ResourceFlavor '{flavor_name}' created") + else: + raise + + +def get_resource_flavor( + custom_api: client.CustomObjectsApi, + flavor_name: str, +) -> Optional[Dict[str, Any]]: + """ + Get a ResourceFlavor by name. + + Args: + custom_api: Kubernetes CustomObjectsApi instance. + flavor_name: Name of the ResourceFlavor. + + Returns: + The ResourceFlavor object or None if not found. + """ + try: + return custom_api.get_cluster_custom_object( + group="kueue.x-k8s.io", + version="v1beta1", + plural="resourceflavors", + name=flavor_name, + ) + except client.exceptions.ApiException as e: + if e.status == 404: + return None + raise + + +def create_cluster_queue( + custom_api: client.CustomObjectsApi, + queue_name: str, + flavor_name: str, + cpu_quota: int = 20, + memory_quota: str = "80Gi", + gpu_quota: int = 2, +) -> None: + """ + Create a Kueue ClusterQueue. + + Args: + custom_api: Kubernetes CustomObjectsApi instance. + queue_name: Name of the ClusterQueue to create. + flavor_name: Name of the ResourceFlavor to use. + cpu_quota: CPU quota for the queue. + memory_quota: Memory quota for the queue. + gpu_quota: GPU quota for the queue. + """ + cluster_queue_body = { + "apiVersion": "kueue.x-k8s.io/v1beta1", + "kind": "ClusterQueue", + "metadata": {"name": queue_name}, + "spec": { + "namespaceSelector": {}, + "resourceGroups": [ + { + "coveredResources": ["cpu", "memory", "nvidia.com/gpu"], + "flavors": [ + { + "name": flavor_name, + "resources": [ + {"name": "cpu", "nominalQuota": cpu_quota}, + {"name": "memory", "nominalQuota": memory_quota}, + {"name": "nvidia.com/gpu", "nominalQuota": gpu_quota}, + ], + }, + ], + } + ], + }, + } + + try: + custom_api.get_cluster_custom_object( + group="kueue.x-k8s.io", + plural="clusterqueues", + version="v1beta1", + name=queue_name, + ) + print(f"ClusterQueue '{queue_name}' already exists") + except client.exceptions.ApiException as e: + if e.status == 404: + custom_api.create_cluster_custom_object( + group="kueue.x-k8s.io", + plural="clusterqueues", + version="v1beta1", + body=cluster_queue_body, + ) + print(f"ClusterQueue '{queue_name}' created") + else: + raise + + +def create_limited_cluster_queue( + custom_api: client.CustomObjectsApi, + queue_name: str, + flavor_name: str, + is_openshift: bool = False, +) -> None: + """ + Create a ClusterQueue with limited resources for preemption testing. + + This mirrors the logic from tests/e2e/support.py create_limited_kueue_resources. + + Args: + custom_api: Kubernetes CustomObjectsApi instance. + queue_name: Name of the ClusterQueue to create. + flavor_name: Name of the ResourceFlavor to use. + is_openshift: Whether running on OpenShift (affects memory quota). + """ + # Adjust quota based on platform - matching old e2e/support.py logic + if is_openshift: + cpu_quota = 3 + memory_quota = "15Gi" + else: + cpu_quota = 3 + memory_quota = "10Gi" + + cluster_queue_body = { + "apiVersion": "kueue.x-k8s.io/v1beta1", + "kind": "ClusterQueue", + "metadata": {"name": queue_name}, + "spec": { + "namespaceSelector": {}, + "resourceGroups": [ + { + "coveredResources": ["cpu", "memory"], + "flavors": [ + { + "name": flavor_name, + "resources": [ + {"name": "cpu", "nominalQuota": cpu_quota}, + {"name": "memory", "nominalQuota": memory_quota}, + ], + } + ], + } + ], + }, + } + + custom_api.create_cluster_custom_object( + group="kueue.x-k8s.io", + plural="clusterqueues", + version="v1beta1", + body=cluster_queue_body, + ) + print(f"Limited ClusterQueue '{queue_name}' created") + + +def create_local_queue( + custom_api: client.CustomObjectsApi, + namespace: str, + cluster_queue_name: str, + local_queue_name: str, + is_default: bool = True, +) -> None: + """ + Create a Kueue LocalQueue in a namespace. + + Args: + custom_api: Kubernetes CustomObjectsApi instance. + namespace: Namespace to create the LocalQueue in. + cluster_queue_name: Name of the ClusterQueue to reference. + local_queue_name: Name of the LocalQueue to create. + is_default: Whether this is the default queue for the namespace. + """ + local_queue_body = { + "apiVersion": "kueue.x-k8s.io/v1beta1", + "kind": "LocalQueue", + "metadata": { + "namespace": namespace, + "name": local_queue_name, + "annotations": {"kueue.x-k8s.io/default-queue": str(is_default).lower()}, + }, + "spec": {"clusterQueue": cluster_queue_name}, + } + + try: + custom_api.get_namespaced_custom_object( + group="kueue.x-k8s.io", + namespace=namespace, + plural="localqueues", + version="v1beta1", + name=local_queue_name, + ) + print(f"LocalQueue '{local_queue_name}' already exists in '{namespace}'") + except client.exceptions.ApiException as e: + if e.status == 404: + custom_api.create_namespaced_custom_object( + group="kueue.x-k8s.io", + namespace=namespace, + plural="localqueues", + version="v1beta1", + body=local_queue_body, + ) + print(f"LocalQueue '{local_queue_name}' created in '{namespace}'") + else: + raise + + +def delete_kueue_resources( + custom_api: client.CustomObjectsApi, + cluster_queues: Optional[List[str]] = None, + resource_flavors: Optional[List[str]] = None, +) -> None: + """ + Delete Kueue resources (ClusterQueues and ResourceFlavors). + + This mirrors the logic from tests/e2e/support.py delete_kueue_resources. + + Args: + custom_api: Kubernetes CustomObjectsApi instance. + cluster_queues: List of ClusterQueue names to delete. + resource_flavors: List of ResourceFlavor names to delete. + """ + # Delete ClusterQueues first (order matters for cleanup) + for cq_name in cluster_queues or []: + try: + custom_api.delete_cluster_custom_object( + group="kueue.x-k8s.io", + plural="clusterqueues", + version="v1beta1", + name=cq_name, + ) + print(f"ClusterQueue '{cq_name}' deleted") + except client.exceptions.ApiException as e: + if e.status != 404: + print(f"Error deleting ClusterQueue '{cq_name}': {e}") + + # Then delete ResourceFlavors + for flavor_name in resource_flavors or []: + try: + custom_api.delete_cluster_custom_object( + group="kueue.x-k8s.io", + plural="resourceflavors", + version="v1beta1", + name=flavor_name, + ) + print(f"ResourceFlavor '{flavor_name}' deleted") + except client.exceptions.ApiException as e: + if e.status != 404: + print(f"Error deleting ResourceFlavor '{flavor_name}': {e}") + + +def get_kueue_workload_for_job( + custom_api: client.CustomObjectsApi, + job_name: str, + namespace: str, +) -> Optional[Dict[str, Any]]: + """ + Find the Kueue workload associated with a RayJob. + + This wraps the function from tests/e2e/support.py with a cleaner API. + + Args: + custom_api: Kubernetes CustomObjectsApi instance. + job_name: Name of the RayJob. + namespace: Namespace of the RayJob. + + Returns: + The workload object or None if not found. + """ + + # Create a mock self object with the custom_api attribute + class MockSelf: + pass + + mock_self = MockSelf() + mock_self.custom_api = custom_api + + return _get_kueue_workload_for_job(mock_self, job_name, namespace) + + +def wait_for_kueue_admission( + custom_api: client.CustomObjectsApi, + job_api, + job_name: str, + namespace: str, + timeout: int = 120, +) -> bool: + """ + Wait for Kueue to admit a job (unsuspend it). + + This wraps the function from tests/e2e/support.py with a cleaner API. + + Args: + custom_api: Kubernetes CustomObjectsApi instance. + job_api: RayjobApi instance. + job_name: Name of the RayJob. + namespace: Namespace of the RayJob. + timeout: Maximum time to wait in seconds. + + Returns: + True if admitted, False if timeout. + """ + + # Create a mock self object with the custom_api attribute + class MockSelf: + pass + + mock_self = MockSelf() + mock_self.custom_api = custom_api + + return _wait_for_kueue_admission(mock_self, job_api, job_name, namespace, timeout) + + +# ============================================================================= +# Export all functions +# ============================================================================= + +__all__ = [ + "create_resource_flavor", + "get_resource_flavor", + "create_cluster_queue", + "create_limited_cluster_queue", + "create_local_queue", + "delete_kueue_resources", + "get_kueue_workload_for_job", + "wait_for_kueue_admission", +] diff --git a/tests/e2e_v2/utils/scripts/__init__.py b/tests/e2e_v2/utils/scripts/__init__.py new file mode 100644 index 00000000..cc97cc7c --- /dev/null +++ b/tests/e2e_v2/utils/scripts/__init__.py @@ -0,0 +1 @@ +# Placeholder scripts for RayJob entrypoints diff --git a/tests/e2e_v2/utils/scripts/cpu_script.py b/tests/e2e_v2/utils/scripts/cpu_script.py new file mode 100644 index 00000000..1742592b --- /dev/null +++ b/tests/e2e_v2/utils/scripts/cpu_script.py @@ -0,0 +1,64 @@ +""" +CPU-optimized RayJob validation script using Ray Train. +""" + +import ray +import sys +import traceback +from ray import train + + +def train_func(config): + """Minimal training function for CPU execution.""" + worker_rank = config.get("worker_rank", 0) + result = sum(i * i for i in range(1000)) + + try: + train.report({"loss": result, "worker_rank": worker_rank}) + except RuntimeError: + pass + + print(f"Worker {worker_rank} completed CPU training task. Result: {result}") + + +def main(): + """Run a minimal Ray Train task on CPU.""" + try: + ray.init() + print("Starting CPU training task...") + print(f"Ray initialized. Cluster resources: {ray.cluster_resources()}") + + @ray.remote + def train_worker(worker_id): + try: + train_func({"worker_rank": worker_id}) + result = sum(i * i for i in range(1000)) + return {"loss": result, "worker_rank": worker_id} + except Exception as e: + print(f"Ray Train context not available, using fallback: {e}") + result = sum(i * i for i in range(1000)) + print( + f"Worker {worker_id} completed CPU training task. Result: {result}" + ) + return {"loss": result, "worker_rank": worker_id} + + results = ray.get([train_worker.remote(i) for i in range(1)]) + all_metrics = {} + for result in results: + if isinstance(result, dict): + all_metrics.update(result) + + print(f"Training completed successfully. Metrics: {all_metrics}") + print("EXISTING_CLUSTER_JOB_SUCCESS") + return 0 + + except Exception as e: + print(f"FAILURE: Exception occurred: {e}") + traceback.print_exc() + return 1 + finally: + ray.shutdown() + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/e2e_v2/utils/scripts/gpu_script.py b/tests/e2e_v2/utils/scripts/gpu_script.py new file mode 100644 index 00000000..d23f0a69 --- /dev/null +++ b/tests/e2e_v2/utils/scripts/gpu_script.py @@ -0,0 +1,118 @@ +""" +GPU-optimized RayJob validation script using Ray Train. + +This script performs a minimal Ray Train task suitable for GPU execution +to validate that a RayJob can successfully connect to and use an existing Ray cluster +with GPU resources. + +Usage as RayJob entrypoint: + python gpu_script.py +""" + +import ray +import sys +from ray import train +from ray.train import ScalingConfig +from ray.train.torch import TorchTrainer + + +def train_func(config): + """ + Minimal training function for GPU execution. + + This performs a simple computation task that validates: + 1. Ray Train can initialize with GPU + 2. GPU workers can execute tasks + 3. Results can be aggregated + + Args: + config: Training configuration dict + """ + # Get the current worker context + worker_rank = train.get_context().get_world_rank() + + # Check if GPU is available + try: + import torch + + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + print(f"Worker {worker_rank} using device: {device}") + + if torch.cuda.is_available(): + # Simple GPU computation task + # Create a small tensor and perform computation on GPU + x = torch.randn(100, 100, device=device) + y = torch.randn(100, 100, device=device) + result = torch.matmul(x, y).sum().item() + print(f"Worker {worker_rank} completed GPU computation. Result: {result}") + else: + # Fallback to CPU if GPU not available + print(f"Worker {worker_rank}: GPU not available, using CPU fallback") + result = sum(i * i for i in range(1000)) + except ImportError: + # If PyTorch is not available, use simple CPU computation + print(f"Worker {worker_rank}: PyTorch not available, using CPU computation") + result = sum(i * i for i in range(1000)) + + # Report metrics back + train.report({"loss": result, "worker_rank": worker_rank}) + + +def main(): + """ + Run a minimal Ray Train task on GPU. + + This validates that: + 1. Ray can be initialized (auto-connects to cluster when run as RayJob) + 2. Ray Train can execute a distributed task with GPU + 3. The job can complete successfully + + Returns: + 0 on success, 1 on failure + """ + try: + # Initialize Ray (auto-connects to cluster when run as RayJob) + ray.init() + + print("Starting GPU training task...") + + # Check cluster resources + resources = ray.cluster_resources() + print(f"Cluster resources: {resources}") + + # Check if GPU is available in the cluster + gpu_available = "GPU" in resources and resources.get("GPU", 0) > 0 + print(f"GPU available in cluster: {gpu_available}") + + # Create a minimal Ray Train trainer for GPU + # Using TorchTrainer (the current Ray Train API) with GPU configuration + trainer = TorchTrainer( + train_func, + scaling_config=ScalingConfig( + num_workers=1, # Use 1 worker for minimal test + use_gpu=True, # Request GPU + ), + ) + + # Run the training + result = trainer.fit() + + print(f"Training completed successfully. Metrics: {result.metrics}") + + # Print success marker that tests can check for + print("EXISTING_CLUSTER_JOB_SUCCESS") + + return 0 + + except Exception as e: + print(f"FAILURE: Exception occurred: {e}") + import traceback + + traceback.print_exc() + return 1 + finally: + ray.shutdown() + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/e2e_v2/utils/scripts/ray_remote_functions.py b/tests/e2e_v2/utils/scripts/ray_remote_functions.py new file mode 100644 index 00000000..df0e0591 --- /dev/null +++ b/tests/e2e_v2/utils/scripts/ray_remote_functions.py @@ -0,0 +1,167 @@ +""" +Ray remote functions example script. + +This script demonstrates various Ray remote function patterns +and is used as an entrypoint for RayJobs in E2E tests. + +Usage as RayJob entrypoint: + python ray_remote_functions.py +""" + +import ray +import sys +import time + + +@ray.remote +def simple_task(x): + """Simple task that doubles a number.""" + return x * 2 + + +@ray.remote +def slow_task(seconds): + """Task that sleeps for a specified time.""" + time.sleep(seconds) + return f"Slept for {seconds} seconds" + + +@ray.remote +def chained_task(x): + """Task that calls another remote task.""" + result = ray.get(simple_task.remote(x)) + return result + 10 + + +@ray.remote +class Counter: + """Simple actor for stateful computation.""" + + def __init__(self, initial_value=0): + self.value = initial_value + + def increment(self, amount=1): + self.value += amount + return self.value + + def get_value(self): + return self.value + + +def test_simple_tasks(): + """Test basic remote function calls.""" + print("Testing simple tasks...") + + results = ray.get([simple_task.remote(i) for i in range(5)]) + expected = [0, 2, 4, 6, 8] + + if results == expected: + print(f" PASS: Simple tasks returned {results}") + return True + else: + print(f" FAIL: Expected {expected}, got {results}") + return False + + +def test_parallel_execution(): + """Test that tasks run in parallel.""" + print("Testing parallel execution...") + + start_time = time.time() + + # Launch 3 tasks that each sleep for 1 second + futures = [slow_task.remote(1) for _ in range(3)] + results = ray.get(futures) + + elapsed = time.time() - start_time + + # If parallel, should complete in ~1-2 seconds, not 3+ seconds + if elapsed < 2.5: + print(f" PASS: Parallel tasks completed in {elapsed:.2f}s") + return True + else: + print(f" FAIL: Tasks took {elapsed:.2f}s (should be ~1s if parallel)") + return False + + +def test_chained_tasks(): + """Test tasks calling other tasks.""" + print("Testing chained tasks...") + + result = ray.get(chained_task.remote(5)) + expected = 20 # (5 * 2) + 10 = 20 + + if result == expected: + print(f" PASS: Chained task returned {result}") + return True + else: + print(f" FAIL: Expected {expected}, got {result}") + return False + + +def test_actors(): + """Test Ray actor functionality.""" + print("Testing actors...") + + counter = Counter.remote(0) + + # Increment several times + for i in range(1, 6): + ray.get(counter.increment.remote(i)) + + final_value = ray.get(counter.get_value.remote()) + expected = 15 # 1 + 2 + 3 + 4 + 5 = 15 + + if final_value == expected: + print(f" PASS: Counter value is {final_value}") + return True + else: + print(f" FAIL: Expected {expected}, got {final_value}") + return False + + +def main(): + """Run all remote function tests.""" + print("Starting ray remote functions test...") + + try: + ray.init() + print(f"Ray initialized. Cluster resources: {ray.cluster_resources()}") + + tests = [ + test_simple_tasks, + test_parallel_execution, + test_chained_tasks, + test_actors, + ] + + results = [] + for test_fn in tests: + try: + results.append(test_fn()) + except Exception as e: + print(f" ERROR in {test_fn.__name__}: {e}") + results.append(False) + + passed = sum(results) + total = len(results) + + print(f"\nResults: {passed}/{total} tests passed") + + if all(results): + print("SUCCESS: All remote function tests passed") + return 0 + else: + print("FAILURE: Some tests failed") + return 1 + + except Exception as e: + print(f"FAILURE: Exception occurred: {e}") + return 1 + finally: + ray.shutdown() + print("Ray shutdown complete") + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/e2e_v2/utils/support.py b/tests/e2e_v2/utils/support.py new file mode 100644 index 00000000..8cb4e28d --- /dev/null +++ b/tests/e2e_v2/utils/support.py @@ -0,0 +1,835 @@ +import os +import random +import string +import subprocess +from time import sleep +from codeflare_sdk import get_cluster +from kubernetes import client, config +from kubernetes.client import V1Toleration +from codeflare_sdk.common.kubernetes_cluster.kube_api_helpers import ( + _kube_api_error_handling, +) +from codeflare_sdk.common.utils import constants +from codeflare_sdk.common.utils.utils import get_ray_image_for_python_version + + +def get_ray_cluster(cluster_name, namespace): + api = client.CustomObjectsApi() + try: + return api.get_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=namespace, + plural="rayclusters", + name=cluster_name, + ) + except client.exceptions.ApiException as e: + if e.status == 404: + return None + raise + + +def is_openshift(): + """Detect if running on OpenShift by checking for OpenShift-specific API resources.""" + try: + api = client.ApiClient() + discovery = client.ApisApi(api) + # Check for OpenShift-specific API group + groups = discovery.get_api_versions().groups + for group in groups: + if group.name == "image.openshift.io": + return True + return False + except Exception: + # If we can't determine, assume it's not OpenShift + return False + + +def get_ray_image(): + """ + Get appropriate Ray image based on platform (OpenShift vs Kind/vanilla K8s). + + The tests marked with @pytest.mark.openshift can run on both OpenShift and Kind clusters + with Kueue installed. This function automatically selects the appropriate image: + - OpenShift: Uses the CUDA runtime image (quay.io/modh/ray:...) + - Kind/K8s: Uses the standard Ray image (rayproject/ray:VERSION) + + You can override this behavior by setting the RAY_IMAGE environment variable. + """ + # Allow explicit override via environment variable + if "RAY_IMAGE" in os.environ: + return os.environ["RAY_IMAGE"] + + # Auto-detect platform and return appropriate image + if is_openshift(): + return get_ray_image_for_python_version() + else: + # Use standard Ray image for Kind/vanilla K8s + return f"rayproject/ray:{constants.RAY_VERSION}" + + +def get_platform_appropriate_resources(): + """ + Get appropriate resource configurations based on platform. + + OpenShift with MODH images requires more memory than Kind with standard Ray images. + + Returns: + dict: Resource configurations with keys: + - head_cpu_requests, head_cpu_limits + - head_memory_requests, head_memory_limits + - worker_cpu_requests, worker_cpu_limits + - worker_memory_requests, worker_memory_limits + """ + if is_openshift(): + # MODH runtime images require more memory + return { + "head_cpu_requests": "1", + "head_cpu_limits": "1.5", + "head_memory_requests": 7, + "head_memory_limits": 8, + "worker_cpu_requests": "1", + "worker_cpu_limits": "1", + "worker_memory_requests": 5, + "worker_memory_limits": 6, + } + else: + # Standard Ray images require less memory + return { + "head_cpu_requests": "1", + "head_cpu_limits": "1.5", + "head_memory_requests": 7, + "head_memory_limits": 8, + "worker_cpu_requests": "1", + "worker_cpu_limits": "1", + "worker_memory_requests": 2, + "worker_memory_limits": 3, + } + + +def get_setup_env_variables(**kwargs): + env_vars = dict() + + # Use input parameters provided for this function as environment variables + for key, value in kwargs.items(): + env_vars[str(key)] = value + + # Use specified pip index url instead of default(https://pypi.org/simple) if related environment variables exists + if ( + "PIP_INDEX_URL" in os.environ + and os.environ.get("PIP_INDEX_URL") != None + and os.environ.get("PIP_INDEX_URL") != "" + ): + env_vars["PIP_INDEX_URL"] = os.environ.get("PIP_INDEX_URL") + env_vars["PIP_TRUSTED_HOST"] = os.environ.get("PIP_TRUSTED_HOST") + else: + env_vars["PIP_INDEX_URL"] = "https://pypi.org/simple/" + env_vars["PIP_TRUSTED_HOST"] = "pypi.org" + + # Use specified storage bucket reference from which to download datasets + if ( + "AWS_DEFAULT_ENDPOINT" in os.environ + and os.environ.get("AWS_DEFAULT_ENDPOINT") != None + and os.environ.get("AWS_DEFAULT_ENDPOINT") != "" + ): + env_vars["AWS_DEFAULT_ENDPOINT"] = os.environ.get("AWS_DEFAULT_ENDPOINT") + env_vars["AWS_ACCESS_KEY_ID"] = os.environ.get("AWS_ACCESS_KEY_ID") + env_vars["AWS_SECRET_ACCESS_KEY"] = os.environ.get("AWS_SECRET_ACCESS_KEY") + env_vars["AWS_STORAGE_BUCKET"] = os.environ.get("AWS_STORAGE_BUCKET") + env_vars["AWS_STORAGE_BUCKET_MNIST_DIR"] = os.environ.get( + "AWS_STORAGE_BUCKET_MNIST_DIR" + ) + return env_vars + + +def random_choice(): + alphabet = string.ascii_lowercase + string.digits + return "".join(random.choices(alphabet, k=5)) + + +def _parse_label_env(env_var, default): + """Parse label from environment variable (format: 'key=value').""" + label_str = os.getenv(env_var, default) + return label_str.split("=") + + +def get_master_taint_key(self): + """ + Detect the actual master/control-plane taint key from nodes. + Returns the taint key if found, or defaults to control-plane. + """ + # Check env var first (most efficient) + if os.getenv("TOLERATION_KEY"): + return os.getenv("TOLERATION_KEY") + + # Try to detect from cluster nodes + try: + nodes = self.api_instance.list_node() + taint_key = next( + ( + taint.key + for node in nodes.items + if node.spec.taints + for taint in node.spec.taints + if taint.key + in [ + "node-role.kubernetes.io/master", + "node-role.kubernetes.io/control-plane", + ] + ), + None, + ) + if taint_key: + return taint_key + except Exception as e: + print(f"Warning: Could not detect master taint key: {e}") + + # Default fallback + return "node-role.kubernetes.io/control-plane" + + +def ensure_nodes_labeled_for_flavors(self, num_flavors, with_labels): + """ + Check if required node labels exist for ResourceFlavor targeting. + This handles both default (worker-1=true) and non-default (ingress-ready=true) flavors. + + NOTE: This function does NOT modify cluster nodes. It only checks if required labels exist. + If labels don't exist, the test will use whatever labels are available on the cluster. + For shared clusters, set WORKER_LABEL and CONTROL_LABEL env vars to match existing labels. + """ + if not with_labels: + return + + worker_label, worker_value = _parse_label_env("WORKER_LABEL", "worker-1=true") + control_label, control_value = _parse_label_env( + "CONTROL_LABEL", "ingress-ready=true" + ) + + try: + worker_nodes = self.api_instance.list_node( + label_selector="node-role.kubernetes.io/worker" + ) + + if not worker_nodes.items: + print("Warning: No worker nodes found") + return + + # Check labels based on num_flavors + labels_to_check = [("WORKER_LABEL", worker_label, worker_value)] + if num_flavors > 1: + labels_to_check.append(("CONTROL_LABEL", control_label, control_value)) + + for env_var, label, value in labels_to_check: + has_label = any( + node.metadata.labels and node.metadata.labels.get(label) == value + for node in worker_nodes.items + ) + if not has_label: + print( + f"Warning: Label {label}={value} not found (set {env_var} env var to match existing labels)" + ) + + except Exception as e: + print(f"Warning: Could not check existing labels: {e}") + + +def create_namespace(self): + try: + self.namespace = f"test-ns-{random_choice()}" + namespace_body = client.V1Namespace( + metadata=client.V1ObjectMeta(name=self.namespace) + ) + self.api_instance.create_namespace(namespace_body) + except Exception as e: + return RuntimeError(e) + + +def create_new_resource_flavor(self, num_flavors, with_labels, with_tolerations): + self.resource_flavors = [] + for i in range(num_flavors): + default = i < 1 + resource_flavor = f"test-resource-flavor-{random_choice()}" + create_resource_flavor( + self, resource_flavor, default, with_labels, with_tolerations + ) + self.resource_flavors.append(resource_flavor) + + +def create_new_cluster_queue(self, num_queues): + self.cluster_queues = [] + for i in range(num_queues): + cluster_queue_name = f"test-cluster-queue-{random_choice()}" + create_cluster_queue(self, cluster_queue_name, self.resource_flavors[i]) + self.cluster_queues.append(cluster_queue_name) + + +def create_new_local_queue(self, num_queues): + self.local_queues = [] + for i in range(num_queues): + is_default = i == 0 + local_queue_name = f"test-local-queue-{random_choice()}" + create_local_queue(self, self.cluster_queues[i], local_queue_name, is_default) + self.local_queues.append(local_queue_name) + + +def create_namespace_with_name(self, namespace_name): + self.namespace = namespace_name + try: + namespace_body = client.V1Namespace( + metadata=client.V1ObjectMeta(name=self.namespace) + ) + self.api_instance.create_namespace(namespace_body) + except Exception as e: + return _kube_api_error_handling(e) + + +def delete_namespace(self): + if hasattr(self, "namespace"): + self.api_instance.delete_namespace(self.namespace) + + +def initialize_kubernetes_client(self): + config.load_kube_config() + # Initialize Kubernetes client + self.api_instance = client.CoreV1Api() + self.custom_api = client.CustomObjectsApi(self.api_instance.api_client) + + +def run_oc_command(args): + try: + result = subprocess.run( + ["oc"] + args, capture_output=True, text=True, check=True + ) + return result.stdout.strip() + except subprocess.CalledProcessError as e: + print(f"Error executing 'oc {' '.join(args)}': {e}") + return None + + +def run_kubectl_command(args): + try: + result = subprocess.run( + ["kubectl"] + args, capture_output=True, text=True, check=True + ) + return result.stdout.strip() + except subprocess.CalledProcessError as e: + print(f"Error executing 'kubectl {' '.join(args)}': {e}") + return None + + +def create_cluster_queue(self, cluster_queue, flavor): + cluster_queue_json = { + "apiVersion": "kueue.x-k8s.io/v1beta1", + "kind": "ClusterQueue", + "metadata": {"name": cluster_queue}, + "spec": { + "namespaceSelector": {}, + "resourceGroups": [ + { + "coveredResources": ["cpu", "memory", "nvidia.com/gpu"], + "flavors": [ + { + "name": flavor, + "resources": [ + {"name": "cpu", "nominalQuota": 20}, + {"name": "memory", "nominalQuota": "80Gi"}, + {"name": "nvidia.com/gpu", "nominalQuota": 2}, + ], + }, + ], + } + ], + }, + } + + try: + # Check if cluster-queue exists + self.custom_api.get_cluster_custom_object( + group="kueue.x-k8s.io", + plural="clusterqueues", + version="v1beta1", + name=cluster_queue, + ) + print(f"'{cluster_queue}' already exists") + except: + # create cluster-queue + self.custom_api.create_cluster_custom_object( + group="kueue.x-k8s.io", + plural="clusterqueues", + version="v1beta1", + body=cluster_queue_json, + ) + print(f"'{cluster_queue}' created") + + self.cluster_queue = cluster_queue + + +def create_resource_flavor( + self, flavor, default=True, with_labels=False, with_tolerations=False +): + worker_label, worker_value = _parse_label_env("WORKER_LABEL", "worker-1=true") + control_label, control_value = _parse_label_env( + "CONTROL_LABEL", "ingress-ready=true" + ) + + toleration_key = os.getenv("TOLERATION_KEY") or get_master_taint_key(self) + + node_labels = {} + if with_labels: + node_labels = ( + {worker_label: worker_value} if default else {control_label: control_value} + ) + + resource_flavor_json = { + "apiVersion": "kueue.x-k8s.io/v1beta1", + "kind": "ResourceFlavor", + "metadata": {"name": flavor}, + "spec": { + "nodeLabels": node_labels, + **( + { + "tolerations": [ + { + "key": toleration_key, + "operator": "Exists", + "effect": "NoSchedule", + } + ] + } + if with_tolerations + else {} + ), + }, + } + + try: + # Check if resource flavor exists + self.custom_api.get_cluster_custom_object( + group="kueue.x-k8s.io", + plural="resourceflavors", + version="v1beta1", + name=flavor, + ) + print(f"'{flavor}' already exists") + except: + # create kueue resource flavor + self.custom_api.create_cluster_custom_object( + group="kueue.x-k8s.io", + plural="resourceflavors", + version="v1beta1", + body=resource_flavor_json, + ) + print(f"'{flavor}' created!") + + self.resource_flavor = flavor + + +def create_local_queue(self, cluster_queue, local_queue, is_default=True): + local_queue_json = { + "apiVersion": "kueue.x-k8s.io/v1beta1", + "kind": "LocalQueue", + "metadata": { + "namespace": self.namespace, + "name": local_queue, + "annotations": {"kueue.x-k8s.io/default-queue": str(is_default).lower()}, + }, + "spec": {"clusterQueue": cluster_queue}, + } + + try: + # Check if local-queue exists in given namespace + self.custom_api.get_namespaced_custom_object( + group="kueue.x-k8s.io", + namespace=self.namespace, + plural="localqueues", + version="v1beta1", + name=local_queue, + ) + print(f"'{local_queue}' already exists in namespace '{self.namespace}'") + except: + # create local-queue + self.custom_api.create_namespaced_custom_object( + group="kueue.x-k8s.io", + namespace=self.namespace, + plural="localqueues", + version="v1beta1", + body=local_queue_json, + ) + print(f"'{local_queue}' created in namespace '{self.namespace}'") + + self.local_queue = local_queue + + +def create_kueue_resources( + self, resource_ammount=1, with_labels=False, with_tolerations=False +): + print("creating Kueue resources ...") + create_new_resource_flavor(self, resource_ammount, with_labels, with_tolerations) + create_new_cluster_queue(self, resource_ammount) + create_new_local_queue(self, resource_ammount) + + +def delete_kueue_resources(self): + # Handle case where resources were never created (e.g., skipped tests) + if not hasattr(self, "cluster_queues"): + return + if not hasattr(self, "resource_flavors"): + return + + for cq in self.cluster_queues: + try: + self.custom_api.delete_cluster_custom_object( + group="kueue.x-k8s.io", + plural="clusterqueues", + version="v1beta1", + name=cq, + ) + print(f"\n'{cq}' cluster-queue deleted") + except Exception as e: + print(f"\nError deleting cluster-queue '{cq}' : {e}") + + # Delete if given resource-flavor exists + for flavor in self.resource_flavors: + try: + self.custom_api.delete_cluster_custom_object( + group="kueue.x-k8s.io", + plural="resourceflavors", + version="v1beta1", + name=flavor, + ) + print(f"'{flavor}' resource-flavor deleted") + except Exception as e: + print(f"\nError deleting resource-flavor '{flavor}': {e}") + + +def get_pod_node(self, namespace, name): + label_selector = f"ray.io/cluster={name}" + pods = self.api_instance.list_namespaced_pod( + namespace, label_selector=label_selector + ) + if not pods.items: + raise ValueError( + f"Unable to retrieve node name for pod '{name}' in namespace '{namespace}'" + ) + pod = pods.items[0] + node_name = pod.spec.node_name + if node_name is None: + raise ValueError( + f"No node selected for pod '{name}' in namespace '{namespace}'" + ) + return node_name + + +def get_flavor_spec(self, flavor_name): + try: + flavor = self.custom_api.get_cluster_custom_object( + group="kueue.x-k8s.io", + version="v1beta1", + plural="resourceflavors", + name=flavor_name, + ) + return flavor + except client.exceptions.ApiException as e: + if e.status == 404: + print(f"ResourceFlavor '{flavor_name}' not found.") + else: + print(f"Error retrieving ResourceFlavor '{flavor_name}': {e}") + raise + + +def get_nodes_by_label(self, node_labels): + label_selector = ",".join(f"{k}={v}" for k, v in node_labels.items()) + nodes = self.api_instance.list_node(label_selector=label_selector) + return [node.metadata.name for node in nodes.items] + + +def get_tolerations_from_flavor(self, flavor_name): + """ + Extract tolerations from a ResourceFlavor and convert them to V1Toleration objects. + Returns a list of V1Toleration objects, or empty list if no tolerations found. + """ + flavor_spec = get_flavor_spec(self, flavor_name) + tolerations_spec = flavor_spec.get("spec", {}).get("tolerations", []) + + return [ + V1Toleration( + key=tol_spec.get("key"), + operator=tol_spec.get("operator", "Equal"), + value=tol_spec.get("value"), + effect=tol_spec.get("effect"), + ) + for tol_spec in tolerations_spec + ] + + +def assert_get_cluster_and_jobsubmit( + self, cluster_name, accelerator=None, number_of_gpus=None +): + # Retrieve the cluster + cluster = get_cluster(cluster_name, self.namespace, False) + + cluster.details() + + # Initialize the job client + client = cluster.job_client + + # Submit a job and get the submission ID + env_vars = ( + get_setup_env_variables(ACCELERATOR=accelerator) + if accelerator + else get_setup_env_variables() + ) + submission_id = client.submit_job( + entrypoint="python mnist.py", + runtime_env={ + "working_dir": "./tests/e2e/", + "pip": "./tests/e2e/mnist_pip_requirements.txt", + "env_vars": env_vars, + }, + entrypoint_num_cpus=1 if number_of_gpus is None else None, + entrypoint_num_gpus=number_of_gpus, + ) + print(f"Submitted job with ID: {submission_id}") + + # Fetch the list of jobs and validate + job_list = client.list_jobs() + print(f"List of Jobs: {job_list}") + + # Validate the number of jobs in the list + assert len(job_list) == 1 + + # Validate the submission ID matches + assert job_list[0].submission_id == submission_id + + cluster.down() + + +def wait_for_kueue_admission(self, job_api, job_name, namespace, timeout=120): + print(f"Waiting for Kueue admission of job '{job_name}'...") + elapsed_time = 0 + check_interval = 5 + + while elapsed_time < timeout: + try: + job_cr = job_api.get_job(name=job_name, k8s_namespace=namespace) + + # Check if the job is no longer suspended + is_suspended = job_cr.get("spec", {}).get("suspend", False) + + if not is_suspended: + print(f"✓ Job '{job_name}' admitted by Kueue (no longer suspended)") + return True + + # Debug: Check workload status every 10 seconds + if elapsed_time % 10 == 0: + workload = get_kueue_workload_for_job(self, job_name, namespace) + if workload: + conditions = workload.get("status", {}).get("conditions", []) + print(f"DEBUG: Workload conditions for '{job_name}':") + for condition in conditions: + print( + f" - {condition.get('type')}: {condition.get('status')} - {condition.get('reason', '')} - {condition.get('message', '')}" + ) + + # Optional: Check status conditions for more details + conditions = job_cr.get("status", {}).get("conditions", []) + for condition in conditions: + if ( + condition.get("type") == "Suspended" + and condition.get("status") == "False" + ): + print( + f"✓ Job '{job_name}' admitted by Kueue (Suspended=False condition)" + ) + return True + + except Exception as e: + print(f"Error checking job status: {e}") + + sleep(check_interval) + elapsed_time += check_interval + + print(f"✗ Timeout waiting for Kueue admission of job '{job_name}'") + return False + + +def create_limited_kueue_resources(self): + print("Creating limited Kueue resources for preemption testing...") + + # Create a resource flavor with default (no special labels/tolerations) + resource_flavor = f"limited-flavor-{random_choice()}" + create_resource_flavor( + self, resource_flavor, default=True, with_labels=False, with_tolerations=False + ) + self.resource_flavors = [resource_flavor] + + # Create a cluster queue with very limited resources + # Adjust quota based on platform - OpenShift needs more memory + if is_openshift(): + # MODH images need more memory, so higher quota but still limited to allow only 1 job + cpu_quota = 3 + memory_quota = "15Gi" # One job needs ~8Gi head, allow some buffer + else: + # Standard Ray images - one job needs ~8G head + 500m submitter + cpu_quota = 3 + memory_quota = "10Gi" # Enough for one job (8G head + submitter), but not two + + cluster_queue_name = f"limited-cq-{random_choice()}" + cluster_queue_json = { + "apiVersion": "kueue.x-k8s.io/v1beta1", + "kind": "ClusterQueue", + "metadata": {"name": cluster_queue_name}, + "spec": { + "namespaceSelector": {}, + "resourceGroups": [ + { + "coveredResources": ["cpu", "memory"], + "flavors": [ + { + "name": resource_flavor, + "resources": [ + { + "name": "cpu", + "nominalQuota": cpu_quota, + }, + { + "name": "memory", + "nominalQuota": memory_quota, + }, + ], + } + ], + } + ], + }, + } + + try: + self.custom_api.create_cluster_custom_object( + group="kueue.x-k8s.io", + plural="clusterqueues", + version="v1beta1", + body=cluster_queue_json, + ) + print(f"✓ Created limited ClusterQueue: {cluster_queue_name}") + except Exception as e: + print(f"Error creating limited ClusterQueue: {e}") + raise + + self.cluster_queues = [cluster_queue_name] + + # Create a local queue + local_queue_name = f"limited-lq-{random_choice()}" + create_local_queue(self, cluster_queue_name, local_queue_name, is_default=True) + self.local_queues = [local_queue_name] + + print("✓ Limited Kueue resources created successfully") + + +def get_kueue_workload_for_job(self, job_name, namespace): + try: + # List all workloads in the namespace + workloads = self.custom_api.list_namespaced_custom_object( + group="kueue.x-k8s.io", + version="v1beta1", + plural="workloads", + namespace=namespace, + ) + + # Find workload with matching RayJob owner reference + for workload in workloads.get("items", []): + owner_refs = workload.get("metadata", {}).get("ownerReferences", []) + + for owner_ref in owner_refs: + if ( + owner_ref.get("kind") == "RayJob" + and owner_ref.get("name") == job_name + ): + workload_name = workload.get("metadata", {}).get("name") + print( + f"✓ Found Kueue workload '{workload_name}' for RayJob '{job_name}'" + ) + return workload + + print(f"✗ No Kueue workload found for RayJob '{job_name}'") + return None + + except Exception as e: + print(f"Error getting Kueue workload for job '{job_name}': {e}") + return None + + +def wait_for_job_status( + job_api, rayjob_name: str, namespace: str, expected_status: str, timeout: int = 30 +) -> bool: + """ + Wait for a RayJob to reach a specific deployment status. + + Args: + job_api: RayjobApi instance + rayjob_name: Name of the RayJob + namespace: Namespace of the RayJob + expected_status: Expected jobDeploymentStatus value + timeout: Maximum time to wait in seconds + + Returns: + bool: True if status reached, False if timeout + """ + elapsed_time = 0 + check_interval = 2 + + while elapsed_time < timeout: + status = job_api.get_job_status(name=rayjob_name, k8s_namespace=namespace) + if status and status.get("jobDeploymentStatus") == expected_status: + return True + + sleep(check_interval) + elapsed_time += check_interval + + return False + + +def verify_rayjob_cluster_cleanup( + cluster_api, rayjob_name: str, namespace: str, timeout: int = 60 +): + """ + Verify that the RayCluster created by a RayJob has been cleaned up. + Handles KubeRay's automatic suffix addition to cluster names. + + Args: + cluster_api: RayClusterApi instance + rayjob_name: Name of the RayJob + namespace: Namespace to check + timeout: Maximum time to wait in seconds + + Raises: + TimeoutError: If cluster is not cleaned up within timeout + """ + elapsed_time = 0 + check_interval = 5 + + while elapsed_time < timeout: + # List all RayClusters in the namespace + clusters = cluster_api.list_ray_clusters( + k8s_namespace=namespace, async_req=False + ) + + # Check if any cluster exists that starts with our job name + found = False + for cluster in clusters.get("items", []): + cluster_name = cluster.get("metadata", {}).get("name", "") + # KubeRay creates clusters with pattern: {job_name}-raycluster-{suffix} + if cluster_name.startswith(f"{rayjob_name}-raycluster"): + found = True + break + + if not found: + # No cluster found, cleanup successful + return + + sleep(check_interval) + elapsed_time += check_interval + + raise TimeoutError( + f"RayCluster for job '{rayjob_name}' was not cleaned up within {timeout} seconds" + ) diff --git a/tests/test_cluster_yamls/kueue/aw_kueue.yaml b/tests/test_cluster_yamls/kueue/aw_kueue.yaml index 92e5078d..ddc35462 100644 --- a/tests/test_cluster_yamls/kueue/aw_kueue.yaml +++ b/tests/test_cluster_yamls/kueue/aw_kueue.yaml @@ -61,8 +61,8 @@ spec: cpu: 2 memory: 8G requests: - cpu: 2 - memory: 8G + cpu: 1 + memory: 5G volumeMounts: - mountPath: /etc/pki/tls/certs/odh-trusted-ca-bundle.crt name: odh-trusted-ca-cert diff --git a/tests/test_cluster_yamls/kueue/ray_cluster_kueue.yaml b/tests/test_cluster_yamls/kueue/ray_cluster_kueue.yaml index 04331aed..14ec8227 100644 --- a/tests/test_cluster_yamls/kueue/ray_cluster_kueue.yaml +++ b/tests/test_cluster_yamls/kueue/ray_cluster_kueue.yaml @@ -61,8 +61,8 @@ spec: cpu: 2 memory: 8G requests: - cpu: 2 - memory: 8G + cpu: 1 + memory: 5G volumeMounts: - mountPath: /etc/pki/tls/certs/odh-trusted-ca-bundle.crt name: odh-trusted-ca-cert diff --git a/tests/test_cluster_yamls/ray/default-appwrapper.yaml b/tests/test_cluster_yamls/ray/default-appwrapper.yaml index 1041f3b5..be15c291 100644 --- a/tests/test_cluster_yamls/ray/default-appwrapper.yaml +++ b/tests/test_cluster_yamls/ray/default-appwrapper.yaml @@ -62,8 +62,8 @@ spec: cpu: 2 memory: 8G requests: - cpu: 2 - memory: 8G + cpu: 1 + memory: 5G volumeMounts: - mountPath: /etc/pki/tls/certs/odh-trusted-ca-bundle.crt name: odh-trusted-ca-cert @@ -121,10 +121,10 @@ spec: resources: limits: cpu: 1 - memory: 2G + memory: 6G requests: cpu: 1 - memory: 2G + memory: 3G volumeMounts: - mountPath: /etc/pki/tls/certs/odh-trusted-ca-bundle.crt name: odh-trusted-ca-cert diff --git a/tests/test_cluster_yamls/ray/default-ray-cluster.yaml b/tests/test_cluster_yamls/ray/default-ray-cluster.yaml index 213b22cf..774944c6 100644 --- a/tests/test_cluster_yamls/ray/default-ray-cluster.yaml +++ b/tests/test_cluster_yamls/ray/default-ray-cluster.yaml @@ -54,8 +54,8 @@ spec: cpu: 2 memory: 8G requests: - cpu: 2 - memory: 8G + cpu: 1 + memory: 5G volumeMounts: - mountPath: /etc/pki/tls/certs/odh-trusted-ca-bundle.crt name: odh-trusted-ca-cert @@ -110,10 +110,10 @@ spec: resources: limits: cpu: 1 - memory: 2G + memory: 6G requests: cpu: 1 - memory: 2G + memory: 3G env: - name: RAY_USAGE_STATS_ENABLED value: '0'