From 1a97856d91dca356625148216e2ac1488a205c26 Mon Sep 17 00:00:00 2001 From: Volv G Date: Wed, 24 Jun 2026 04:50:34 -0700 Subject: [PATCH] fix: retry stuck gcsfuse pod setup failures Assisted-By: devx/ba25adb1-db34-47af-bfef-ecf43bc6627a --- .../launchers/kubernetes_launchers.py | 251 ++++++++++++++++++ tests/test_kubernetes_pod_retry.py | 191 +++++++++++++ 2 files changed, 442 insertions(+) create mode 100644 tests/test_kubernetes_pod_retry.py diff --git a/cloud_pipelines_backend/launchers/kubernetes_launchers.py b/cloud_pipelines_backend/launchers/kubernetes_launchers.py index 25cd267b..da768a87 100644 --- a/cloud_pipelines_backend/launchers/kubernetes_launchers.py +++ b/cloud_pipelines_backend/launchers/kubernetes_launchers.py @@ -1,6 +1,7 @@ from __future__ import annotations import copy +import dataclasses import datetime import json import logging @@ -74,6 +75,145 @@ _CONTAINER_FILE_NAME = "data" +_KUBERNETES_STUCK_POD_RETRY_MIN_AGE = datetime.timedelta(minutes=5) +_KUBERNETES_STUCK_POD_MAX_RETRY_ATTEMPTS = 1 +_KUBERNETES_STUCK_POD_RETRY_METADATA_KEY = "pod_retry" +_KUBERNETES_STUCK_POD_GCSFUSE_SUBPATH_REASON = ( + "gcsfuse_subpath_create_container_config_error" +) + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class _StuckPodClassification: + retryable: bool + reason: str + message: str | None = None + + +def _find_pod_container_status( + pod: k8s_client_lib.V1Pod, *, container_name: str +) -> k8s_client_lib.V1ContainerStatus | None: + pod_status: k8s_client_lib.V1PodStatus | None = pod.status + if not pod_status or not pod_status.container_statuses: + return None + matching_statuses = [ + container_status + for container_status in pod_status.container_statuses + if container_status.name == container_name + ] + if len(matching_statuses) != 1: + return None + return matching_statuses[0] + + +def _normalize_datetime_to_utc(value: datetime.datetime) -> datetime.datetime: + if value.tzinfo is None: + return value.replace(tzinfo=datetime.timezone.utc) + return value.astimezone(datetime.timezone.utc) + + +def _classify_stuck_unrecoverable_standalone_pod( + pod: k8s_client_lib.V1Pod, + *, + now: datetime.datetime | None = None, + min_age: datetime.timedelta = _KUBERNETES_STUCK_POD_RETRY_MIN_AGE, +) -> _StuckPodClassification: + """Classify the narrow GCSFuse/subPath setup failure observed in Pending Pods.""" + metadata: k8s_client_lib.V1ObjectMeta | None = pod.metadata + if metadata and metadata.owner_references: + return _StuckPodClassification( + retryable=False, reason="pod_has_owner_references" + ) + + pod_status: k8s_client_lib.V1PodStatus | None = pod.status + if not pod_status or pod_status.phase != "Pending": + return _StuckPodClassification(retryable=False, reason="pod_not_pending") + + creation_timestamp = metadata.creation_timestamp if metadata else None + if creation_timestamp is None: + return _StuckPodClassification( + retryable=False, reason="pod_creation_timestamp_missing" + ) + now = _normalize_datetime_to_utc( + now or datetime.datetime.now(datetime.timezone.utc) + ) + creation_timestamp = _normalize_datetime_to_utc(creation_timestamp) + if now - creation_timestamp < min_age: + return _StuckPodClassification(retryable=False, reason="pod_too_young") + + main_container_status = _find_pod_container_status( + pod, container_name=_MAIN_CONTAINER_NAME + ) + if main_container_status is None: + return _StuckPodClassification( + retryable=False, reason="main_container_status_missing" + ) + if main_container_status.started is True: + return _StuckPodClassification( + retryable=False, reason="main_container_already_started" + ) + if (main_container_status.restart_count or 0) > 0: + return _StuckPodClassification( + retryable=False, reason="main_container_restarted" + ) + last_state: k8s_client_lib.V1ContainerState | None = ( + main_container_status.last_state + ) + if last_state and (last_state.running or last_state.terminated): + return _StuckPodClassification( + retryable=False, reason="main_container_has_previous_state" + ) + + state: k8s_client_lib.V1ContainerState | None = main_container_status.state + waiting_state = state.waiting if state else None + if not waiting_state: + return _StuckPodClassification( + retryable=False, reason="main_container_not_waiting" + ) + if waiting_state.reason != "CreateContainerConfigError": + return _StuckPodClassification( + retryable=False, reason="main_container_waiting_reason_not_allowlisted" + ) + + message = waiting_state.message or "" + normalized_message = message.lower() + if "failed to prepare subpath" not in normalized_message: + return _StuckPodClassification( + retryable=False, reason="main_container_waiting_message_not_subpath" + ) + if "gcsfuse" not in normalized_message: + return _StuckPodClassification( + retryable=False, reason="main_container_waiting_message_not_gcsfuse" + ) + + return _StuckPodClassification( + retryable=True, + reason=_KUBERNETES_STUCK_POD_GCSFUSE_SUBPATH_REASON, + message=message, + ) + + +def _build_replacement_pod_for_stuck_setup_retry( + pod: k8s_client_lib.V1Pod, +) -> k8s_client_lib.V1Pod: + replacement_pod = copy.deepcopy(pod) + old_metadata: k8s_client_lib.V1ObjectMeta | None = pod.metadata + old_name = old_metadata.name if old_metadata else None + generate_name = old_metadata.generate_name if old_metadata else None + if not generate_name: + generate_name = f"{(old_name or 'task-pod')[:50]}-retry-" + + replacement_pod.metadata = k8s_client_lib.V1ObjectMeta( + generate_name=generate_name, + namespace=old_metadata.namespace if old_metadata else None, + labels=copy.deepcopy(old_metadata.labels) if old_metadata else None, + annotations=copy.deepcopy(old_metadata.annotations) if old_metadata else None, + ) + replacement_pod.status = None + if replacement_pod.spec: + replacement_pod.spec.node_name = None + return replacement_pod + def _create_volume_and_volume_mount_host_path( container_path: str, @@ -751,6 +891,7 @@ def __init__( debug_pod: k8s_client_lib.V1Pod, cluster_server: str | None = None, launcher: _KubernetesPodLauncher | None = None, + retry_metadata: dict[str, Any] | None = None, ): self._pod_name = pod_name self._namespace = namespace @@ -759,6 +900,7 @@ def __init__( self._debug_pod = debug_pod self._cluster_server = cluster_server self._launcher = launcher + self._retry_metadata = retry_metadata or {} def _get_launcher(self): if not self._launcher: @@ -803,10 +945,22 @@ def _get_main_container_terminated_state( # def id(self) -> str: # return self.pod_name + def _get_pod_setup_retry_attempts(self) -> int: + return int((self._retry_metadata or {}).get("attempts", 0) or 0) + @property def status(self) -> interfaces.ContainerStatus: phase_str = self._debug_pod.status.phase if phase_str == "Pending": + classification = _classify_stuck_unrecoverable_standalone_pod( + self._debug_pod + ) + if ( + classification.retryable + and self._get_pod_setup_retry_attempts() + >= _KUBERNETES_STUCK_POD_MAX_RETRY_ATTEMPTS + ): + return interfaces.ContainerStatus.ERROR return interfaces.ContainerStatus.PENDING elif phase_str == "Running": return interfaces.ContainerStatus.RUNNING @@ -888,6 +1042,10 @@ def to_dict(self) -> dict[str, Any]: debug_pod=pod_dict, ), ) + if self._retry_metadata: + result["kubernetes"][_KUBERNETES_STUCK_POD_RETRY_METADATA_KEY] = ( + copy.deepcopy(self._retry_metadata) + ) return result @classmethod @@ -905,6 +1063,7 @@ def from_dict( log_uri=d["log_uri"], debug_pod=debug_pod, launcher=launcher, + retry_metadata=d.get(_KUBERNETES_STUCK_POD_RETRY_METADATA_KEY) or {}, ) def get_refreshed(self) -> "LaunchedKubernetesContainer": @@ -917,8 +1076,100 @@ def get_refreshed(self) -> "LaunchedKubernetesContainer": ) new_launched_container = copy.copy(self) new_launched_container._debug_pod = pod + + classification = _classify_stuck_unrecoverable_standalone_pod(pod) + if ( + classification.retryable + and new_launched_container._get_pod_setup_retry_attempts() + < _KUBERNETES_STUCK_POD_MAX_RETRY_ATTEMPTS + ): + return new_launched_container._retry_stuck_pod_with_replacement( + pod=pod, + classification=classification, + core_api_client=core_api_client, + ) return new_launched_container + def _retry_stuck_pod_with_replacement( + self, + *, + pod: k8s_client_lib.V1Pod, + classification: _StuckPodClassification, + core_api_client: k8s_client_lib.CoreV1Api, + ) -> "LaunchedKubernetesContainer": + launcher = self._get_launcher() + retry_attempt = self._get_pod_setup_retry_attempts() + 1 + old_metadata: k8s_client_lib.V1ObjectMeta | None = pod.metadata + old_pod_uid = old_metadata.uid if old_metadata else None + _logger.warning( + "Retrying stuck Kubernetes pod %s/%s due to %s. " + "Deleting the old pod can make live pod logs unavailable unless they " + "were already collected by cluster logging.", + self._namespace, + self._pod_name, + classification.reason, + ) + try: + core_api_client.delete_namespaced_pod( + name=self._pod_name, + namespace=self._namespace, + grace_period_seconds=10, + _request_timeout=launcher._request_timeout, + ) + except kubernetes.client.exceptions.ApiException as ex: + if ex.status != 404: + raise + _logger.warning( + "Pod %s/%s was already deleted before retry replacement.", + self._namespace, + self._pod_name, + ) + + replacement_pod = _build_replacement_pod_for_stuck_setup_retry(pod) + created_pod: k8s_client_lib.V1Pod = core_api_client.create_namespaced_pod( + namespace=self._namespace, + body=replacement_pod, + _request_timeout=launcher._request_timeout, + ) + created_pod_name: str = created_pod.metadata.name + created_pod_namespace: str = created_pod.metadata.namespace or self._namespace + _logger.info( + "Created replacement Kubernetes pod %s/%s for stuck pod %s/%s.", + created_pod_namespace, + created_pod_name, + self._namespace, + self._pod_name, + ) + + retry_metadata = copy.deepcopy(self._retry_metadata) + retry_history = list(retry_metadata.get("history", [])) + retry_history.append( + dict( + attempt=retry_attempt, + reason=classification.reason, + message=classification.message, + old_pod_name=self._pod_name, + old_pod_uid=old_pod_uid, + replacement_pod_name=created_pod_name, + ) + ) + retry_metadata.update( + attempts=retry_attempt, + max_attempts=_KUBERNETES_STUCK_POD_MAX_RETRY_ATTEMPTS, + last_reason=classification.reason, + history=retry_history, + ) + return LaunchedKubernetesContainer( + pod_name=created_pod_name, + namespace=created_pod_namespace, + output_uris=self._output_uris, + log_uri=self._log_uri, + debug_pod=created_pod, + cluster_server=self._cluster_server, + launcher=self._launcher, + retry_metadata=retry_metadata, + ) + def get_log(self) -> str: launcher = self._get_launcher() core_api_client = k8s_client_lib.CoreV1Api(api_client=launcher._api_client) diff --git a/tests/test_kubernetes_pod_retry.py b/tests/test_kubernetes_pod_retry.py new file mode 100644 index 00000000..77c79c4c --- /dev/null +++ b/tests/test_kubernetes_pod_retry.py @@ -0,0 +1,191 @@ +import datetime +from types import SimpleNamespace + +from kubernetes import client as k8s + +from cloud_pipelines_backend.launchers import interfaces +from cloud_pipelines_backend.launchers import kubernetes_launchers as k8sL + +_OBSERVED_GCSFUSE_SUBPATH_MESSAGE = ( + 'failed to prepare subPath for volumeMount "gcsfuse-prd-oasis-tmp" ' + 'of container "main"' +) + + +def _make_pending_pod( + *, + waiting_reason: str = "CreateContainerConfigError", + waiting_message: str = _OBSERVED_GCSFUSE_SUBPATH_MESSAGE, + creation_timestamp: datetime.datetime | None = None, + started: bool | None = False, + restart_count: int = 0, + last_state: k8s.V1ContainerState | None = None, + owner_references: list[k8s.V1OwnerReference] | None = None, +) -> k8s.V1Pod: + creation_timestamp = creation_timestamp or ( + datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(minutes=10) + ) + return k8s.V1Pod( + metadata=k8s.V1ObjectMeta( + name="task-abc-old", + namespace="default", + uid="old-uid", + generate_name="task-abc-", + labels={"app": "oasis"}, + annotations={"cloud-pipelines.net": "true"}, + creation_timestamp=creation_timestamp, + owner_references=owner_references, + ), + spec=k8s.V1PodSpec( + containers=[k8s.V1Container(name="main", image="example/image")], + node_name="old-node", + ), + status=k8s.V1PodStatus( + phase="Pending", + container_statuses=[ + k8s.V1ContainerStatus( + name="main", + image="example/image", + image_id="", + ready=False, + restart_count=restart_count, + started=started, + last_state=last_state, + state=k8s.V1ContainerState( + waiting=k8s.V1ContainerStateWaiting( + reason=waiting_reason, + message=waiting_message, + ) + ), + ) + ], + ), + ) + + +def test_stuck_pod_classifier_matches_observed_gcsfuse_subpath_error(): + classification = k8sL._classify_stuck_unrecoverable_standalone_pod( + _make_pending_pod() + ) + + assert classification.retryable is True + assert classification.reason == k8sL._KUBERNETES_STUCK_POD_GCSFUSE_SUBPATH_REASON + assert "gcsfuse" in (classification.message or "") + + +def test_stuck_pod_classifier_is_narrowly_allowlisted(): + image_pull = k8sL._classify_stuck_unrecoverable_standalone_pod( + _make_pending_pod( + waiting_reason="ImagePullBackOff", + waiting_message="failed to pull image", + ) + ) + assert image_pull.retryable is False + + already_started = k8sL._classify_stuck_unrecoverable_standalone_pod( + _make_pending_pod(started=True) + ) + assert already_started.retryable is False + + too_young = k8sL._classify_stuck_unrecoverable_standalone_pod( + _make_pending_pod( + creation_timestamp=datetime.datetime.now(datetime.timezone.utc) + ) + ) + assert too_young.retryable is False + + owned_by_job = k8sL._classify_stuck_unrecoverable_standalone_pod( + _make_pending_pod( + owner_references=[ + k8s.V1OwnerReference( + api_version="batch/v1", + kind="Job", + name="job", + uid="job-uid", + ) + ] + ) + ) + assert owned_by_job.retryable is False + + +def test_get_refreshed_replaces_stuck_standalone_pod_once(monkeypatch): + old_pod = _make_pending_pod() + + class FakeCoreV1Api: + def __init__(self): + self.deleted = [] + self.created_body = None + + def read_namespaced_pod(self, *, name, namespace, _request_timeout): + assert name == "task-abc-old" + assert namespace == "default" + return old_pod + + def delete_namespaced_pod( + self, *, name, namespace, grace_period_seconds, _request_timeout + ): + self.deleted.append( + dict( + name=name, + namespace=namespace, + grace_period_seconds=grace_period_seconds, + ) + ) + + def create_namespaced_pod(self, *, namespace, body, _request_timeout): + self.created_body = body + body.metadata.name = "task-abc-retry" + body.metadata.namespace = namespace + body.metadata.uid = "retry-uid" + body.metadata.creation_timestamp = datetime.datetime.now( + datetime.timezone.utc + ) + body.status = k8s.V1PodStatus(phase="Pending") + return body + + fake_api = FakeCoreV1Api() + monkeypatch.setattr(k8sL.k8s_client_lib, "CoreV1Api", lambda api_client: fake_api) + + launched = k8sL.LaunchedKubernetesContainer( + pod_name="task-abc-old", + namespace="default", + output_uris={}, + log_uri="gs://logs/log.txt", + debug_pod=old_pod, + launcher=SimpleNamespace(_api_client=object(), _request_timeout=10), + ) + + refreshed = launched.get_refreshed() + + assert fake_api.deleted == [ + dict(name="task-abc-old", namespace="default", grace_period_seconds=10) + ] + assert fake_api.created_body.metadata.name == "task-abc-retry" + assert fake_api.created_body.metadata.generate_name == "task-abc-" + assert fake_api.created_body.spec.node_name is None + assert fake_api.created_body.status.phase == "Pending" + assert refreshed.status == interfaces.ContainerStatus.PENDING + + serialized = refreshed.to_dict()["kubernetes"] + assert serialized["pod_name"] == "task-abc-retry" + assert serialized[k8sL._KUBERNETES_STUCK_POD_RETRY_METADATA_KEY]["attempts"] == 1 + assert ( + serialized[k8sL._KUBERNETES_STUCK_POD_RETRY_METADATA_KEY]["history"][0][ + "old_pod_name" + ] + == "task-abc-old" + ) + + +def test_retry_metadata_exhaustion_marks_same_stuck_state_error(): + launched = k8sL.LaunchedKubernetesContainer( + pod_name="task-abc-old", + namespace="default", + output_uris={}, + log_uri="gs://logs/log.txt", + debug_pod=_make_pending_pod(), + retry_metadata={"attempts": k8sL._KUBERNETES_STUCK_POD_MAX_RETRY_ATTEMPTS}, + ) + + assert launched.status == interfaces.ContainerStatus.ERROR