Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 251 additions & 0 deletions cloud_pipelines_backend/launchers/kubernetes_launchers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import copy
import dataclasses
import datetime
import json
import logging
Expand Down Expand Up @@ -74,6 +75,145 @@

_CONTAINER_FILE_NAME = "data"

_KUBERNETES_STUCK_POD_RETRY_MIN_AGE = datetime.timedelta(minutes=5)
_KUBERNETES_STUCK_POD_MAX_RETRY_ATTEMPTS = 1
_KUBERNETES_STUCK_POD_RETRY_METADATA_KEY = "pod_retry"
_KUBERNETES_STUCK_POD_GCSFUSE_SUBPATH_REASON = (
"gcsfuse_subpath_create_container_config_error"
)


@dataclasses.dataclass(frozen=True, kw_only=True)
class _StuckPodClassification:
retryable: bool
reason: str
message: str | None = None


def _find_pod_container_status(
pod: k8s_client_lib.V1Pod, *, container_name: str
) -> k8s_client_lib.V1ContainerStatus | None:
pod_status: k8s_client_lib.V1PodStatus | None = pod.status
if not pod_status or not pod_status.container_statuses:
return None
matching_statuses = [
container_status
for container_status in pod_status.container_statuses
if container_status.name == container_name
]
if len(matching_statuses) != 1:
return None
return matching_statuses[0]


def _normalize_datetime_to_utc(value: datetime.datetime) -> datetime.datetime:
if value.tzinfo is None:
return value.replace(tzinfo=datetime.timezone.utc)
return value.astimezone(datetime.timezone.utc)


def _classify_stuck_unrecoverable_standalone_pod(
pod: k8s_client_lib.V1Pod,
*,
now: datetime.datetime | None = None,
min_age: datetime.timedelta = _KUBERNETES_STUCK_POD_RETRY_MIN_AGE,
) -> _StuckPodClassification:
"""Classify the narrow GCSFuse/subPath setup failure observed in Pending Pods."""
metadata: k8s_client_lib.V1ObjectMeta | None = pod.metadata
if metadata and metadata.owner_references:
return _StuckPodClassification(
retryable=False, reason="pod_has_owner_references"
)

pod_status: k8s_client_lib.V1PodStatus | None = pod.status
if not pod_status or pod_status.phase != "Pending":
return _StuckPodClassification(retryable=False, reason="pod_not_pending")

creation_timestamp = metadata.creation_timestamp if metadata else None
if creation_timestamp is None:
return _StuckPodClassification(
retryable=False, reason="pod_creation_timestamp_missing"
)
now = _normalize_datetime_to_utc(
now or datetime.datetime.now(datetime.timezone.utc)
)
creation_timestamp = _normalize_datetime_to_utc(creation_timestamp)
if now - creation_timestamp < min_age:
return _StuckPodClassification(retryable=False, reason="pod_too_young")

main_container_status = _find_pod_container_status(
pod, container_name=_MAIN_CONTAINER_NAME
)
if main_container_status is None:
return _StuckPodClassification(
retryable=False, reason="main_container_status_missing"
)
if main_container_status.started is True:
return _StuckPodClassification(
retryable=False, reason="main_container_already_started"
)
if (main_container_status.restart_count or 0) > 0:
return _StuckPodClassification(
retryable=False, reason="main_container_restarted"
)
last_state: k8s_client_lib.V1ContainerState | None = (
main_container_status.last_state
)
if last_state and (last_state.running or last_state.terminated):
return _StuckPodClassification(
retryable=False, reason="main_container_has_previous_state"
)

state: k8s_client_lib.V1ContainerState | None = main_container_status.state
waiting_state = state.waiting if state else None
if not waiting_state:
return _StuckPodClassification(
retryable=False, reason="main_container_not_waiting"
)
if waiting_state.reason != "CreateContainerConfigError":
return _StuckPodClassification(
retryable=False, reason="main_container_waiting_reason_not_allowlisted"
)

message = waiting_state.message or ""
normalized_message = message.lower()
if "failed to prepare subpath" not in normalized_message:
return _StuckPodClassification(
retryable=False, reason="main_container_waiting_message_not_subpath"
)
if "gcsfuse" not in normalized_message:
return _StuckPodClassification(
retryable=False, reason="main_container_waiting_message_not_gcsfuse"
)

return _StuckPodClassification(
retryable=True,
reason=_KUBERNETES_STUCK_POD_GCSFUSE_SUBPATH_REASON,
message=message,
)


def _build_replacement_pod_for_stuck_setup_retry(
pod: k8s_client_lib.V1Pod,
) -> k8s_client_lib.V1Pod:
replacement_pod = copy.deepcopy(pod)
old_metadata: k8s_client_lib.V1ObjectMeta | None = pod.metadata
old_name = old_metadata.name if old_metadata else None
generate_name = old_metadata.generate_name if old_metadata else None
if not generate_name:
generate_name = f"{(old_name or 'task-pod')[:50]}-retry-"

replacement_pod.metadata = k8s_client_lib.V1ObjectMeta(
generate_name=generate_name,
namespace=old_metadata.namespace if old_metadata else None,
labels=copy.deepcopy(old_metadata.labels) if old_metadata else None,
annotations=copy.deepcopy(old_metadata.annotations) if old_metadata else None,
)
replacement_pod.status = None
if replacement_pod.spec:
replacement_pod.spec.node_name = None
return replacement_pod


def _create_volume_and_volume_mount_host_path(
container_path: str,
Expand Down Expand Up @@ -751,6 +891,7 @@ def __init__(
debug_pod: k8s_client_lib.V1Pod,
cluster_server: str | None = None,
launcher: _KubernetesPodLauncher | None = None,
retry_metadata: dict[str, Any] | None = None,
):
self._pod_name = pod_name
self._namespace = namespace
Expand All @@ -759,6 +900,7 @@ def __init__(
self._debug_pod = debug_pod
self._cluster_server = cluster_server
self._launcher = launcher
self._retry_metadata = retry_metadata or {}

def _get_launcher(self):
if not self._launcher:
Expand Down Expand Up @@ -803,10 +945,22 @@ def _get_main_container_terminated_state(
# def id(self) -> str:
# return self.pod_name

def _get_pod_setup_retry_attempts(self) -> int:
return int((self._retry_metadata or {}).get("attempts", 0) or 0)

@property
def status(self) -> interfaces.ContainerStatus:
phase_str = self._debug_pod.status.phase
if phase_str == "Pending":
classification = _classify_stuck_unrecoverable_standalone_pod(
self._debug_pod
)
if (
classification.retryable
and self._get_pod_setup_retry_attempts()
>= _KUBERNETES_STUCK_POD_MAX_RETRY_ATTEMPTS
):
return interfaces.ContainerStatus.ERROR
return interfaces.ContainerStatus.PENDING
elif phase_str == "Running":
return interfaces.ContainerStatus.RUNNING
Expand Down Expand Up @@ -888,6 +1042,10 @@ def to_dict(self) -> dict[str, Any]:
debug_pod=pod_dict,
),
)
if self._retry_metadata:
result["kubernetes"][_KUBERNETES_STUCK_POD_RETRY_METADATA_KEY] = (
copy.deepcopy(self._retry_metadata)
)
return result

@classmethod
Expand All @@ -905,6 +1063,7 @@ def from_dict(
log_uri=d["log_uri"],
debug_pod=debug_pod,
launcher=launcher,
retry_metadata=d.get(_KUBERNETES_STUCK_POD_RETRY_METADATA_KEY) or {},
)

def get_refreshed(self) -> "LaunchedKubernetesContainer":
Expand All @@ -917,8 +1076,100 @@ def get_refreshed(self) -> "LaunchedKubernetesContainer":
)
new_launched_container = copy.copy(self)
new_launched_container._debug_pod = pod

classification = _classify_stuck_unrecoverable_standalone_pod(pod)
if (
classification.retryable
and new_launched_container._get_pod_setup_retry_attempts()
< _KUBERNETES_STUCK_POD_MAX_RETRY_ATTEMPTS
):
return new_launched_container._retry_stuck_pod_with_replacement(
pod=pod,
classification=classification,
core_api_client=core_api_client,
)
return new_launched_container

def _retry_stuck_pod_with_replacement(
self,
*,
pod: k8s_client_lib.V1Pod,
classification: _StuckPodClassification,
core_api_client: k8s_client_lib.CoreV1Api,
) -> "LaunchedKubernetesContainer":
launcher = self._get_launcher()
retry_attempt = self._get_pod_setup_retry_attempts() + 1
old_metadata: k8s_client_lib.V1ObjectMeta | None = pod.metadata
old_pod_uid = old_metadata.uid if old_metadata else None
_logger.warning(
"Retrying stuck Kubernetes pod %s/%s due to %s. "
"Deleting the old pod can make live pod logs unavailable unless they "
"were already collected by cluster logging.",
self._namespace,
self._pod_name,
classification.reason,
)
try:
core_api_client.delete_namespaced_pod(
name=self._pod_name,
namespace=self._namespace,
grace_period_seconds=10,
_request_timeout=launcher._request_timeout,
)
except kubernetes.client.exceptions.ApiException as ex:
if ex.status != 404:
raise
_logger.warning(
"Pod %s/%s was already deleted before retry replacement.",
self._namespace,
self._pod_name,
)

replacement_pod = _build_replacement_pod_for_stuck_setup_retry(pod)
created_pod: k8s_client_lib.V1Pod = core_api_client.create_namespaced_pod(
namespace=self._namespace,
body=replacement_pod,
_request_timeout=launcher._request_timeout,
)
created_pod_name: str = created_pod.metadata.name
created_pod_namespace: str = created_pod.metadata.namespace or self._namespace
_logger.info(
"Created replacement Kubernetes pod %s/%s for stuck pod %s/%s.",
created_pod_namespace,
created_pod_name,
self._namespace,
self._pod_name,
)

retry_metadata = copy.deepcopy(self._retry_metadata)
retry_history = list(retry_metadata.get("history", []))
retry_history.append(
dict(
attempt=retry_attempt,
reason=classification.reason,
message=classification.message,
old_pod_name=self._pod_name,
old_pod_uid=old_pod_uid,
replacement_pod_name=created_pod_name,
)
)
retry_metadata.update(
attempts=retry_attempt,
max_attempts=_KUBERNETES_STUCK_POD_MAX_RETRY_ATTEMPTS,
last_reason=classification.reason,
history=retry_history,
)
return LaunchedKubernetesContainer(
pod_name=created_pod_name,
namespace=created_pod_namespace,
output_uris=self._output_uris,
log_uri=self._log_uri,
debug_pod=created_pod,
cluster_server=self._cluster_server,
launcher=self._launcher,
retry_metadata=retry_metadata,
)

def get_log(self) -> str:
launcher = self._get_launcher()
core_api_client = k8s_client_lib.CoreV1Api(api_client=launcher._api_client)
Expand Down
Loading
Loading