diff --git a/chart/templates/rbac/pod-cleanup-role.yaml b/chart/templates/rbac/pod-cleanup-role.yaml index af73df6f61946..5f5b42bcfdca6 100644 --- a/chart/templates/rbac/pod-cleanup-role.yaml +++ b/chart/templates/rbac/pod-cleanup-role.yaml @@ -41,4 +41,11 @@ rules: verbs: - "list" - "delete" + - apiGroups: + - "" + resources: + - "secrets" + verbs: + - "list" + - "delete" {{- end }} diff --git a/chart/templates/rbac/pod-launcher-role.yaml b/chart/templates/rbac/pod-launcher-role.yaml index c6f3a54d19fba..6e2e73be87b78 100644 --- a/chart/templates/rbac/pod-launcher-role.yaml +++ b/chart/templates/rbac/pod-launcher-role.yaml @@ -77,4 +77,11 @@ rules: verbs: - "list" - "watch" + - apiGroups: + - "" + resources: + - "secrets" + verbs: + - "create" + - "patch" {{- end }} diff --git a/helm-tests/tests/helm_tests/airflow_aux/test_cleanup_pods.py b/helm-tests/tests/helm_tests/airflow_aux/test_cleanup_pods.py index 31006e526253c..85c758af25159 100644 --- a/helm-tests/tests/helm_tests/airflow_aux/test_cleanup_pods.py +++ b/helm-tests/tests/helm_tests/airflow_aux/test_cleanup_pods.py @@ -446,6 +446,22 @@ def test_global_volumes_and_volume_mounts(self): "emptyDir": {}, } in jmespath.search("spec.jobTemplate.spec.template.spec.volumes", docs[0]) + def test_cleanup_role_includes_secrets_delete_permission(self): + docs = render_chart( + values={ + "executor": "KubernetesExecutor", + "cleanup": {"enabled": True}, + "rbac": {"create": True}, + }, + show_only=["templates/rbac/pod-cleanup-role.yaml"], + ) + + rules = jmespath.search("rules", docs[0]) + assert rules == [ + {"apiGroups": [""], "resources": ["pods"], "verbs": ["list", "delete"]}, + {"apiGroups": [""], "resources": ["secrets"], "verbs": ["list", "delete"]}, + ] + class TestCleanupServiceAccount: """Tests cleanup of service accounts.""" diff --git a/helm-tests/tests/helm_tests/security/test_rbac_pod_launcher.py b/helm-tests/tests/helm_tests/security/test_rbac_pod_launcher.py index 72467271d076b..6e346853ce0c3 100644 --- a/helm-tests/tests/helm_tests/security/test_rbac_pod_launcher.py +++ b/helm-tests/tests/helm_tests/security/test_rbac_pod_launcher.py @@ -25,16 +25,30 @@ class TestPodLauncher: """Tests RBAC Pod Launcher.""" @pytest.mark.parametrize( - ("rbac_create", "allow_pod_launching", "multi_ns", "expected_kind", "expected_name"), + ( + "rbac_create", + "allow_pod_launching", + "multi_ns", + "expected_kind", + "expected_name", + "expected_secret_verbs", + ), [ - (True, True, False, "Role", "release-name-pod-launcher-role"), - (True, True, True, "ClusterRole", "default-release-name-pod-launcher-role"), - (True, False, False, None, None), - (False, True, False, None, None), + (True, True, False, "Role", "release-name-pod-launcher-role", {"create", "patch"}), + ( + True, + True, + True, + "ClusterRole", + "default-release-name-pod-launcher-role", + {"create", "patch"}, + ), + (True, False, False, None, None, None), + (False, True, False, None, None, None), ], ) def test_pod_launcher_role( - self, rbac_create, allow_pod_launching, multi_ns, expected_kind, expected_name + self, rbac_create, allow_pod_launching, multi_ns, expected_kind, expected_name, expected_secret_verbs ): docs = render_chart( values={ @@ -49,6 +63,10 @@ def test_pod_launcher_role( else: assert docs[0]["kind"] == expected_kind assert docs[0]["metadata"]["name"] == expected_name + rules = jmespath.search("rules", docs[0]) + secrets_rule = next((r for r in rules if "secrets" in r.get("resources", [])), None) + assert secrets_rule is not None + assert set(secrets_rule["verbs"]) == expected_secret_verbs @pytest.mark.parametrize( ( diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py index 2b9a2cba9dc01..ebaf024eeb21d 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py @@ -31,7 +31,11 @@ from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubeConfig from airflow.providers.cncf.kubernetes.kube_client import get_kube_client from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import create_unique_id -from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator, generate_pod_command_args +from airflow.providers.cncf.kubernetes.pod_generator import ( + WORKLOAD_SECRET_NAME_PREFIX, + PodGenerator, + generate_pod_command_args, +) from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS from airflow.utils import cli as cli_utils, yaml from airflow.utils.providers_configuration_loader import providers_configuration_loaded @@ -187,6 +191,8 @@ def cleanup_pods(args): break list_kwargs["_continue"] = continue_token + _cleanup_orphaned_workload_secrets(kube_client, namespace) + def _delete_pod(name, namespace): """ @@ -199,3 +205,35 @@ def _delete_pod(name, namespace): print(f'Deleting POD "{name}" from "{namespace}" namespace') api_response = kube_client.delete_namespaced_pod(name=name, namespace=namespace, body=delete_options) print(api_response) + _delete_workload_secret(name, namespace) + + +def _cleanup_orphaned_workload_secrets(kube_client, namespace: str) -> None: + """Delete workload secrets whose owner pod no longer exists.""" + print(f"Checking for orphaned workload secrets in namespace {namespace}") + existing_pods = {pod.metadata.name for pod in kube_client.list_namespaced_pod(namespace=namespace).items} + secret_list = kube_client.list_namespaced_secret( + namespace=namespace, label_selector="airflow-workload-secret=true" + ) + for secret in secret_list.items: + secret_name = secret.metadata.name + pod_name = secret_name.removeprefix(WORKLOAD_SECRET_NAME_PREFIX + "-") + if pod_name not in existing_pods: + print(f'Deleting orphaned workload secret "{secret_name}"') + try: + kube_client.delete_namespaced_secret(name=secret_name, namespace=namespace) + except ApiException as e: + if e.status != 404: + print(f'Failed to delete orphaned workload secret "{secret_name}": {e}', file=sys.stderr) + + +def _delete_workload_secret(pod_name: str, namespace: str) -> None: + """Delete the workload secret associated with a pod if it exists.""" + kube_client = get_kube_client() + secret_name = f"airflow-workload-{pod_name}" + try: + kube_client.delete_namespaced_secret(name=secret_name, namespace=namespace) + print(f'Deleted workload secret "{secret_name}" from "{namespace}" namespace') + except ApiException as e: + if e.status != 404: + print(f'Failed to delete workload secret "{secret_name}": {e}') diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index ed31b43bf2544..a1193eb71f8b6 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -28,6 +28,7 @@ from urllib3.exceptions import ReadTimeoutError from airflow.providers.cncf.kubernetes.backcompat import get_logical_date_key +from airflow.providers.cncf.kubernetes.exceptions import KubernetesApiPermissionError from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import ( ADOPTED, ALL_NAMESPACES, @@ -44,7 +45,12 @@ annotations_to_key, create_unique_id, ) -from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator, workload_to_command_args +from airflow.providers.cncf.kubernetes.pod_generator import ( + WORKLOAD_SECRET_NAME_PREFIX, + PodGenerator, + make_safe_label_value, + workload_to_command_args_json_path, +) from airflow.providers.common.compat.sdk import AirflowException from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import TaskInstanceState @@ -553,12 +559,50 @@ def run_next(self, next_job: KubernetesJob) -> None: pod_template_file = next_job.pod_template_file dag_id, task_id, run_id, try_number, map_index = key + + pod_id = create_unique_id(dag_id, task_id) + secret_name: str | None = None + if len(command) == 1: from airflow.executors.workloads import ExecuteTask if isinstance(command[0], ExecuteTask): workload = command[0] - command = workload_to_command_args(workload) + secret_name = f"{WORKLOAD_SECRET_NAME_PREFIX}-{pod_id}" + labels: dict[str, str] = { + "airflow-workload-secret": "true", + "dag_id": make_safe_label_value(workload.ti.dag_id), + "task_id": make_safe_label_value(workload.ti.task_id), + "run_id": make_safe_label_value(workload.ti.run_id), + "try_number": str(workload.ti.try_number), + "ti_id": str(workload.ti.id), + } + if workload.ti.map_index is not None and workload.ti.map_index >= 0: + labels["map_index"] = str(workload.ti.map_index) + try: + self.kube_client.create_namespaced_secret( + namespace=self.namespace, + body=client.V1Secret( + metadata=client.V1ObjectMeta( + name=secret_name, + namespace=self.namespace, + labels=labels, + ), + string_data={"workload.json": workload.model_dump_json()}, + ), + ) + except ApiException as e: + if e.status == 403: + raise KubernetesApiPermissionError( + f"Failed to create workload secret '{secret_name}' in namespace " + f"'{self.namespace}': permission denied (HTTP 403). " + "Ensure the RBAC pod-launcher-role has 'create' and 'patch' verbs for " + "'secrets'. If you recently upgraded the cncf-kubernetes provider, " + "update the pod-launcher-role in your helm charts or grant the missing " + "permissions manually." + ) from e + raise + command = workload_to_command_args_json_path() else: raise ValueError( f"KubernetesExecutor doesn't know how to handle workload of type: {type(command[0])}" @@ -576,7 +620,7 @@ def run_next(self, next_job: KubernetesJob) -> None: pod = PodGenerator.construct_pod( namespace=self.namespace, scheduler_job_id=self.scheduler_job_id, - pod_id=create_unique_id(dag_id, task_id), + pod_id=pod_id, dag_id=dag_id, task_id=task_id, kube_image=self.kube_config.kube_image, @@ -588,7 +632,9 @@ def run_next(self, next_job: KubernetesJob) -> None: pod_override_object=kube_executor_config, base_worker_pod=base_worker_pod, with_mutation_hook=True, + workload_secret_name=secret_name, ) + # Reconcile the pod generated by the Operator and the Pod # generated by the .cfg file self.log.info( @@ -600,9 +646,58 @@ def run_next(self, next_job: KubernetesJob) -> None: self.log.debug("Kubernetes running for command %s", command) self.log.debug("Kubernetes launching image %s", pod.spec.containers[0].image) - # the watcher will monitor pods, so we do not block. - self.run_pod_async(pod, **self.kube_config.kube_client_request_args) - self.log.debug("Kubernetes Job created!") + try: + resp = self.run_pod_async(pod, **self.kube_config.kube_client_request_args) + except Exception: + if secret_name: + try: + self.kube_client.delete_namespaced_secret(secret_name, self.namespace) + except ApiException: + self.log.debug( + "Failed to clean up workload secret %s after pod creation failure; " + "it will be removed by the cleanup CronJob.", + secret_name, + exc_info=True, + ) + raise + + if secret_name: + try: + self.kube_client.patch_namespaced_secret( + name=secret_name, + namespace=self.namespace, + body={ + "metadata": { + "ownerReferences": [ + { + "apiVersion": "v1", + "kind": "Pod", + "name": resp.metadata.name, + "uid": resp.metadata.uid, + # Pod should not wait on secret to be deleted + "blockOwnerDeletion": False, + } + ] + } + }, + ) + except ApiException as e: + if e.status == 403: + self.log.warning( + "Could not set ownerReference on workload secret %s: permission denied (HTTP 403). " + "Ensure the scheduler's RBAC role grants the 'patch' verb on 'secrets'. " + "If you recently upgraded the cncf-kubernetes provider, update the " + "pod-launcher-role in your Helm chart. " + "The cleanup CronJob will delete the secret as a fallback.", + secret_name, + ) + else: + self.log.warning( + "Could not set ownerReference on workload secret %s; " + "as a fallback the cleanup CronJob will delete it.", + secret_name, + exc_info=True, + ) def delete_pod(self, pod_name: str, namespace: str) -> None: """Delete Pod from a namespace; does not raise if it does not exist.""" @@ -616,7 +711,7 @@ def delete_pod(self, pod_name: str, namespace: str) -> None: ) except ApiException as e: # If the pod is already deleted - if str(e.status) != "404": + if e.status != 404: raise def patch_pod_revoked(self, *, pod_name: str, namespace: str): diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index 81f09b0bee426..b9f61308da127 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -971,7 +971,7 @@ async def delete_pod(self, name: str, namespace: str): ) except async_client.ApiException as e: # If the pod is already deleted - if str(e.status) != "404": + if e.status != 404: raise @generic_api_retry diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py index bb7453eaf270c..04916f9e3706b 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py @@ -364,5 +364,5 @@ def delete_spark_job(self, spark_job_name=None): ) except ApiException as e: # If the pod is already deleted - if str(e.status) != "404": + if e.status != 404: raise diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py index 478a35045cc74..b4b658b6913eb 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py @@ -54,28 +54,25 @@ if TYPE_CHECKING: import datetime - from airflow.executors import workloads from airflow.models.taskinstance import TaskInstance log = logging.getLogger(__name__) MAX_LABEL_LEN = 63 +WORKLOAD_SECRET_VOLUME_NAME = "airflow-workload" +WORKLOAD_SECRET_NAME_PREFIX = "airflow-workload" +WORKLOAD_SECRET_MOUNT_PATH = "/run/secrets/airflow-workload" +WORKLOAD_JSON_PATH = "/run/secrets/airflow-workload/workload.json" -def workload_to_command_args(workload: workloads.ExecuteTask) -> list[str]: - """ - Convert a workload object to Task SDK command arguments. - :param workload: The ExecuteTask workload to convert - :return: List of command arguments for the Task SDK - """ - ser_input = workload.model_dump_json() +def workload_to_command_args_json_path() -> list[str]: return [ "python", "-m", "airflow.sdk.execution_time.execute_workload", - "--json-string", - ser_input, + "--json-path", + WORKLOAD_JSON_PATH, ] @@ -85,14 +82,10 @@ def generate_pod_command_args(task_instance: TaskInstance) -> list[str]: This function handles backwards compatibility between Airflow 2.x and 3.x: - In Airflow 2.x: Uses the existing ``command_as_list()`` method - - In Airflow 3.x: Uses the Task SDK workload approach with serialized workload + - In Airflow 3.x: Uses the Task SDK workload approach with the workload read from a file """ if AIRFLOW_V_3_0_PLUS: - # In Airflow 3+, use the Task SDK workload approach - from airflow.executors import workloads - - workload = workloads.ExecuteTask.make(task_instance) - return workload_to_command_args(workload) + return workload_to_command_args_json_path() # In Airflow 2.x, use the existing method return task_instance.command_as_list() @@ -328,6 +321,7 @@ def construct_pod( map_index: int = -1, *, with_mutation_hook: bool = False, + workload_secret_name: str | None = None, ) -> k8s.V1Pod: """ Create a Pod. @@ -404,6 +398,25 @@ def construct_pod( except Exception as e: raise PodReconciliationError from e + if workload_secret_name: + if pod.spec.volumes is None: + pod.spec.volumes = [] + pod.spec.volumes.append( + k8s.V1Volume( + name=WORKLOAD_SECRET_VOLUME_NAME, + secret=k8s.V1SecretVolumeSource(secret_name=workload_secret_name), + ) + ) + if pod.spec.containers[0].volume_mounts is None: + pod.spec.containers[0].volume_mounts = [] + pod.spec.containers[0].volume_mounts.append( + k8s.V1VolumeMount( + name=WORKLOAD_SECRET_VOLUME_NAME, + mount_path=WORKLOAD_SECRET_MOUNT_PATH, + read_only=True, + ) + ) + if with_mutation_hook: from airflow.settings import pod_mutation_hook diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 01f8f4a3fa2c6..b057ef48197f1 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -354,7 +354,7 @@ def delete_pod(self, pod: V1Pod) -> None: ) except ApiException as e: # If the pod is already deleted - if str(e.status) != "404": + if e.status != 404: raise @generic_api_retry diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/cli/test_kubernetes_command.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/cli/test_kubernetes_command.py index 6ae8b81c50f3c..e9660b622dc7d 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/cli/test_kubernetes_command.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/cli/test_kubernetes_command.py @@ -84,17 +84,24 @@ def setup_class(cls): importlib.reload(cli_parser) cls.parser = cli_parser.get_parser() + @mock.patch("kubernetes.client.CoreV1Api.delete_namespaced_secret") @mock.patch("kubernetes.client.CoreV1Api.delete_namespaced_pod") @mock.patch("airflow.providers.cncf.kubernetes.kube_client.config.load_incluster_config") - def test_delete_pod(self, load_incluster_config, delete_namespaced_pod): + def test_delete_pod(self, load_incluster_config, delete_namespaced_pod, delete_namespaced_secret): kubernetes_command._delete_pod("dummy", "awesome-namespace") delete_namespaced_pod.assert_called_with(body=mock.ANY, name="dummy", namespace="awesome-namespace") - load_incluster_config.assert_called_once() + delete_namespaced_secret.assert_called_with( + name="airflow-workload-dummy", namespace="awesome-namespace" + ) + load_incluster_config.assert_called() + @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._cleanup_orphaned_workload_secrets") @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._delete_pod") @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") @mock.patch("airflow.providers.cncf.kubernetes.kube_client.config.load_incluster_config") - def test_running_pods_are_not_cleaned(self, load_incluster_config, list_namespaced_pod, delete_pod): + def test_running_pods_are_not_cleaned( + self, load_incluster_config, list_namespaced_pod, delete_pod, mock_cleanup_orphaned + ): pod1 = MagicMock() pod1.metadata.name = "dummy" pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z") @@ -113,10 +120,13 @@ def test_running_pods_are_not_cleaned(self, load_incluster_config, list_namespac delete_pod.assert_not_called() load_incluster_config.assert_called_once() + @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._cleanup_orphaned_workload_secrets") @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._delete_pod") @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") @mock.patch("airflow.providers.cncf.kubernetes.kube_client.config.load_incluster_config") - def test_cleanup_succeeded_pods(self, load_incluster_config, list_namespaced_pod, delete_pod): + def test_cleanup_succeeded_pods( + self, load_incluster_config, list_namespaced_pod, delete_pod, mock_cleanup_orphaned + ): pod1 = MagicMock() pod1.metadata.name = "dummy" pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z") @@ -135,11 +145,12 @@ def test_cleanup_succeeded_pods(self, load_incluster_config, list_namespaced_pod delete_pod.assert_called_with("dummy", "awesome-namespace") load_incluster_config.assert_called_once() + @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._cleanup_orphaned_workload_secrets") @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._delete_pod") @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") @mock.patch("kubernetes.config.load_incluster_config") def test_no_cleanup_failed_pods_wo_restart_policy_never( - self, load_incluster_config, list_namespaced_pod, delete_pod + self, load_incluster_config, list_namespaced_pod, delete_pod, mock_cleanup_orphaned ): pod1 = MagicMock() pod1.metadata.name = "dummy2" @@ -160,11 +171,12 @@ def test_no_cleanup_failed_pods_wo_restart_policy_never( delete_pod.assert_not_called() load_incluster_config.assert_called_once() + @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._cleanup_orphaned_workload_secrets") @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._delete_pod") @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") @mock.patch("kubernetes.config.load_incluster_config") def test_cleanup_failed_pods_w_restart_policy_never( - self, load_incluster_config, list_namespaced_pod, delete_pod + self, load_incluster_config, list_namespaced_pod, delete_pod, mock_cleanup_orphaned ): pod1 = MagicMock() pod1.metadata.name = "dummy3" @@ -185,10 +197,13 @@ def test_cleanup_failed_pods_w_restart_policy_never( delete_pod.assert_called_with("dummy3", "awesome-namespace") load_incluster_config.assert_called_once() + @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._cleanup_orphaned_workload_secrets") @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._delete_pod") @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") @mock.patch("kubernetes.config.load_incluster_config") - def test_cleanup_evicted_pods(self, load_incluster_config, list_namespaced_pod, delete_pod): + def test_cleanup_evicted_pods( + self, load_incluster_config, list_namespaced_pod, delete_pod, mock_cleanup_orphaned + ): pod1 = MagicMock() pod1.metadata.name = "dummy4" pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z") @@ -208,10 +223,13 @@ def test_cleanup_evicted_pods(self, load_incluster_config, list_namespaced_pod, delete_pod.assert_called_with("dummy4", "awesome-namespace") load_incluster_config.assert_called_once() + @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._cleanup_orphaned_workload_secrets") @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._delete_pod") @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") @mock.patch("kubernetes.config.load_incluster_config") - def test_cleanup_pending_pods(self, load_incluster_config, list_namespaced_pod, delete_pod): + def test_cleanup_pending_pods( + self, load_incluster_config, list_namespaced_pod, delete_pod, mock_cleanup_orphaned + ): pod1 = MagicMock() pod1.metadata.name = "dummy5" pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z") @@ -230,10 +248,13 @@ def test_cleanup_pending_pods(self, load_incluster_config, list_namespaced_pod, delete_pod.assert_called_with("dummy5", "awesome-namespace") load_incluster_config.assert_called_once() + @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._cleanup_orphaned_workload_secrets") @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._delete_pod") @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") @mock.patch("kubernetes.config.load_incluster_config") - def test_cleanup_api_exception_continue(self, load_incluster_config, list_namespaced_pod, delete_pod): + def test_cleanup_api_exception_continue( + self, load_incluster_config, list_namespaced_pod, delete_pod, mock_cleanup_orphaned + ): delete_pod.side_effect = kubernetes.client.rest.ApiException(status=0) pod1 = MagicMock() pod1.metadata.name = "dummy" @@ -252,10 +273,13 @@ def test_cleanup_api_exception_continue(self, load_incluster_config, list_namesp ) load_incluster_config.assert_called_once() + @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._cleanup_orphaned_workload_secrets") @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._delete_pod") @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") @mock.patch("kubernetes.config.load_incluster_config") - def test_list_pod_with_continue_token(self, load_incluster_config, list_namespaced_pod, delete_pod): + def test_list_pod_with_continue_token( + self, load_incluster_config, list_namespaced_pod, delete_pod, mock_cleanup_orphaned + ): pod1 = MagicMock() pod1.metadata.name = "dummy" pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z") @@ -283,3 +307,26 @@ def test_list_pod_with_continue_token(self, load_incluster_config, list_namespac list_namespaced_pod.assert_has_calls(calls) delete_pod.assert_called_with("dummy", "awesome-namespace") load_incluster_config.assert_called_once() + + def test_cleanup_orphaned_secrets_deletes_when_pod_gone(self): + mock_kube_client = MagicMock() + + existing_pod = MagicMock() + existing_pod.metadata.name = "live-pod" + mock_kube_client.list_namespaced_pod.return_value.items = [existing_pod] + + orphaned_secret = MagicMock() + orphaned_secret.metadata.name = "airflow-workload-gone-pod" + live_secret = MagicMock() + live_secret.metadata.name = f"airflow-workload-{existing_pod.metadata.name}" + mock_kube_client.list_namespaced_secret.return_value.items = [orphaned_secret, live_secret] + + kubernetes_command._cleanup_orphaned_workload_secrets(mock_kube_client, "awesome-namespace") + + mock_kube_client.list_namespaced_pod.assert_called_once_with(namespace="awesome-namespace") + mock_kube_client.list_namespaced_secret.assert_called_once_with( + namespace="awesome-namespace", label_selector="airflow-workload-secret=true" + ) + mock_kube_client.delete_namespaced_secret.assert_called_once_with( + name="airflow-workload-gone-pod", namespace="awesome-namespace" + ) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index f8b4a59356f0e..5391a755793a2 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -19,7 +19,9 @@ import random import re import string +import uuid from datetime import datetime +from pathlib import Path from unittest import mock import pytest @@ -52,6 +54,7 @@ create_unique_id, get_logs_task_metadata, ) +from airflow.providers.cncf.kubernetes.pod_generator import WORKLOAD_JSON_PATH from airflow.providers.common.compat.sdk import AirflowException from airflow.providers.standard.operators.empty import EmptyOperator @@ -67,6 +70,7 @@ if AIRFLOW_V_3_0_PLUS: LOGICAL_DATE_KEY = "logical_date" + from airflow.executors.workloads import BundleInfo, ExecuteTask, TaskInstance as WorkloadTaskInstance else: LOGICAL_DATE_KEY = "execution_date" @@ -604,6 +608,122 @@ def test_run_next_pod_reconciliation_error( finally: kubernetes_executor.end() + @pytest.mark.skipif( + AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed" + ) + @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="ExecuteTask workload is only available in Airflow 3") + @mock.patch( + "airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.AirflowKubernetesScheduler.run_pod_async" + ) + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + def test_run_next_creates_workload_secret_for_execute_task( + self, mock_get_kube_client, mock_kubernetes_job_watcher, mock_run_pod_async, data_file + ): + template_file = data_file("pods/generator_base_with_secrets.yaml").as_posix() + mock_kube_client = mock_get_kube_client.return_value + + workload = ExecuteTask( + token="test-token", + ti=WorkloadTaskInstance( + id=uuid.UUID("4d828a62-a417-4936-a7a6-2b3fabacecab"), + dag_version_id=uuid.UUID("4d828a62-a417-4936-a7a6-2b3fabacecab"), + task_id="test_task", + dag_id="test_dag", + run_id="test_run", + try_number=1, + pool_slots=1, + queue="default", + priority_weight=1, + ), + dag_rel_path=Path("test_dag.py"), + bundle_info=BundleInfo(name="test", version="1.0"), + log_path="test.log", + ) + + mock_run_pod_async.return_value.metadata.name = "test-pod-name" + mock_run_pod_async.return_value.metadata.uid = "test-pod-uid" + + with conf_vars({("kubernetes_executor", "pod_template_file"): template_file}): + kubernetes_executor = self.kubernetes_executor + kubernetes_executor.start() + try: + kubernetes_executor.execute_async(key=workload.ti.key, queue=None, command=[workload]) + kubernetes_executor.sync() + + mock_kube_client.create_namespaced_secret.assert_called_once() + secret_body = mock_kube_client.create_namespaced_secret.call_args[1]["body"] + assert "workload.json" in secret_body.string_data + labels = secret_body.metadata.labels + assert labels["airflow-workload-secret"] == "true" + assert labels["dag_id"] == "test_dag" + assert labels["task_id"] == "test_task" + assert labels["run_id"] == "test_run" + assert labels["try_number"] == "1" + assert labels["ti_id"] == "4d828a62-a417-4936-a7a6-2b3fabacecab" + assert "map_index" not in labels + + mock_run_pod_async.assert_called_once() + pod = mock_run_pod_async.call_args[0][0] + assert "--json-path" in pod.spec.containers[0].args + assert WORKLOAD_JSON_PATH in pod.spec.containers[0].args + + mock_kube_client.patch_namespaced_secret.assert_called_once_with( + name=secret_body.metadata.name, + namespace="default", + body=mock.ANY, + ) + finally: + kubernetes_executor.end() + + @pytest.mark.skipif( + AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed" + ) + @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="ExecuteTask workload is only available in Airflow 3") + @mock.patch( + "airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.AirflowKubernetesScheduler.run_pod_async" + ) + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + def test_run_next_deletes_secret_if_pod_creation_fails( + self, mock_get_kube_client, mock_kubernetes_job_watcher, mock_run_pod_async, data_file + ): + template_file = data_file("pods/generator_base_with_secrets.yaml").as_posix() + mock_kube_client = mock_get_kube_client.return_value + # mock pod creation failure here + mock_run_pod_async.side_effect = ApiException(status=500, reason="Internal Server Error") + + workload = ExecuteTask( + token="test-token", + ti=WorkloadTaskInstance( + id=uuid.UUID("4d828a62-a417-4936-a7a6-2b3fabacecab"), + dag_version_id=uuid.UUID("4d828a62-a417-4936-a7a6-2b3fabacecab"), + task_id="test_task", + dag_id="test_dag", + run_id="test_run", + try_number=1, + pool_slots=1, + queue="default", + priority_weight=1, + ), + dag_rel_path=Path("test_dag.py"), + bundle_info=BundleInfo(name="test", version="1.0"), + log_path="test.log", + ) + + with conf_vars({("kubernetes_executor", "pod_template_file"): template_file}): + kubernetes_executor = self.kubernetes_executor + kubernetes_executor.start() + try: + kubernetes_executor.execute_async(key=workload.ti.key, queue=None, command=[workload]) + kubernetes_executor.sync() + + mock_kube_client.create_namespaced_secret.assert_called_once() + secret_name = mock_kube_client.create_namespaced_secret.call_args[1]["body"].metadata.name + mock_kube_client.delete_namespaced_secret.assert_called_once_with(secret_name, "default") + finally: + kubernetes_executor.end() + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubeConfig") @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.sync") @mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_tasks") diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_template_rendering.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_template_rendering.py index 7cefad4409d1f..00d824bab6668 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_template_rendering.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_template_rendering.py @@ -49,15 +49,14 @@ def test_render_k8s_pod_yaml(pod_mutation_hook, create_task_instance): ) if AIRFLOW_V_3_0_PLUS: - from airflow.executors import workloads + from airflow.providers.cncf.kubernetes.pod_generator import WORKLOAD_JSON_PATH - workload = workloads.ExecuteTask.make(ti) rendered_args = [ "python", "-m", "airflow.sdk.execution_time.execute_workload", - "--json-string", - workload.model_dump_json(), + "--json-path", + WORKLOAD_JSON_PATH, ] else: rendered_args = [