From e9c2758222de338d463c82dcae03003d4b7cb8b7 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Wed, 18 Feb 2026 20:04:52 +0530 Subject: [PATCH 1/9] Pass KE workload via mounted secret to workers --- chart/templates/rbac/pod-launcher-role.yaml | 8 +++ .../security/test_rbac_pod_launcher.py | 30 ++++++-- .../executors/kubernetes_executor_utils.py | 68 +++++++++++++++++-- .../cncf/kubernetes/pod_generator.py | 23 ++----- .../executors/test_kubernetes_executor.py | 55 +++++++++++++++ .../kubernetes/test_template_rendering.py | 7 +- 6 files changed, 159 insertions(+), 32 deletions(-) diff --git a/chart/templates/rbac/pod-launcher-role.yaml b/chart/templates/rbac/pod-launcher-role.yaml index c6f3a54d19fba..c804fb61914da 100644 --- a/chart/templates/rbac/pod-launcher-role.yaml +++ b/chart/templates/rbac/pod-launcher-role.yaml @@ -77,4 +77,12 @@ rules: verbs: - "list" - "watch" + - apiGroups: + - "" + resources: + - "secrets" + verbs: + - "create" + - "get" + - "delete" {{- end }} diff --git a/helm-tests/tests/helm_tests/security/test_rbac_pod_launcher.py b/helm-tests/tests/helm_tests/security/test_rbac_pod_launcher.py index 72467271d076b..ae8726ef32df8 100644 --- a/helm-tests/tests/helm_tests/security/test_rbac_pod_launcher.py +++ b/helm-tests/tests/helm_tests/security/test_rbac_pod_launcher.py @@ -25,16 +25,30 @@ class TestPodLauncher: """Tests RBAC Pod Launcher.""" @pytest.mark.parametrize( - ("rbac_create", "allow_pod_launching", "multi_ns", "expected_kind", "expected_name"), + ( + "rbac_create", + "allow_pod_launching", + "multi_ns", + "expected_kind", + "expected_name", + "expected_secret_verbs", + ), [ - (True, True, False, "Role", "release-name-pod-launcher-role"), - (True, True, True, "ClusterRole", "default-release-name-pod-launcher-role"), - (True, False, False, None, None), - (False, True, False, None, None), + (True, True, False, "Role", "release-name-pod-launcher-role", {"create", "get", "delete"}), + ( + True, + True, + True, + "ClusterRole", + "default-release-name-pod-launcher-role", + {"create", "get", "delete"}, + ), + (True, False, False, None, None, None), + (False, True, False, None, None, None), ], ) def test_pod_launcher_role( - self, rbac_create, allow_pod_launching, multi_ns, expected_kind, expected_name + self, rbac_create, allow_pod_launching, multi_ns, expected_kind, expected_name, expected_secret_verbs ): docs = render_chart( values={ @@ -49,6 +63,10 @@ def test_pod_launcher_role( else: assert docs[0]["kind"] == expected_kind assert docs[0]["metadata"]["name"] == expected_name + rules = jmespath.search("rules", docs[0]) + secrets_rule = next((r for r in rules if "secrets" in r.get("resources", [])), None) + assert secrets_rule is not None + assert set(secrets_rule["verbs"]) == expected_secret_verbs @pytest.mark.parametrize( ( diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index ed31b43bf2544..608b5f12854de 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -44,7 +44,11 @@ annotations_to_key, create_unique_id, ) -from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator, workload_to_command_args +from airflow.providers.cncf.kubernetes.pod_generator import ( + WORKLOAD_SECRET_VOLUME_NAME, + PodGenerator, + workload_to_command_args_json_path, +) from airflow.providers.common.compat.sdk import AirflowException from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import TaskInstanceState @@ -553,12 +557,28 @@ def run_next(self, next_job: KubernetesJob) -> None: pod_template_file = next_job.pod_template_file dag_id, task_id, run_id, try_number, map_index = key + + pod_id = create_unique_id(dag_id, task_id) + secret_name = "" + if len(command) == 1: from airflow.executors.workloads import ExecuteTask if isinstance(command[0], ExecuteTask): workload = command[0] - command = workload_to_command_args(workload) + secret_name = f"{WORKLOAD_SECRET_VOLUME_NAME}-{pod_id}" + self.kube_client.create_namespaced_secret( + namespace=self.namespace, + body=client.V1Secret( + metadata=client.V1ObjectMeta( + name=secret_name, + namespace=self.namespace, + labels={"airflow-workload-secret": "true"}, + ), + string_data={"workload.json": workload.model_dump_json()}, + ), + ) + command = workload_to_command_args_json_path() else: raise ValueError( f"KubernetesExecutor doesn't know how to handle workload of type: {type(command[0])}" @@ -576,7 +596,7 @@ def run_next(self, next_job: KubernetesJob) -> None: pod = PodGenerator.construct_pod( namespace=self.namespace, scheduler_job_id=self.scheduler_job_id, - pod_id=create_unique_id(dag_id, task_id), + pod_id=pod_id, dag_id=dag_id, task_id=task_id, kube_image=self.kube_config.kube_image, @@ -589,6 +609,26 @@ def run_next(self, next_job: KubernetesJob) -> None: base_worker_pod=base_worker_pod, with_mutation_hook=True, ) + + if secret_name: + if pod.spec.volumes is None: + pod.spec.volumes = [] + pod.spec.volumes.append( + client.V1Volume( + name=WORKLOAD_SECRET_VOLUME_NAME, + secret=client.V1SecretVolumeSource(secret_name=secret_name), + ) + ) + if pod.spec.containers[0].volume_mounts is None: + pod.spec.containers[0].volume_mounts = [] + pod.spec.containers[0].volume_mounts.append( + client.V1VolumeMount( + name=WORKLOAD_SECRET_VOLUME_NAME, + mount_path="/run/secrets/airflow-workload", + read_only=True, + ) + ) + # Reconcile the pod generated by the Operator and the Pod # generated by the .cfg file self.log.info( @@ -600,9 +640,11 @@ def run_next(self, next_job: KubernetesJob) -> None: self.log.debug("Kubernetes running for command %s", command) self.log.debug("Kubernetes launching image %s", pod.spec.containers[0].image) - # the watcher will monitor pods, so we do not block. - self.run_pod_async(pod, **self.kube_config.kube_client_request_args) - self.log.debug("Kubernetes Job created!") + with contextlib.ExitStack() as stack: + if secret_name: + stack.callback(self._delete_workload_secret, secret_name, self.namespace) + self.run_pod_async(pod, **self.kube_config.kube_client_request_args) + stack.pop_all() def delete_pod(self, pod_name: str, namespace: str) -> None: """Delete Pod from a namespace; does not raise if it does not exist.""" @@ -618,6 +660,19 @@ def delete_pod(self, pod_name: str, namespace: str) -> None: # If the pod is already deleted if str(e.status) != "404": raise + self._delete_workload_secret(f"airflow-workload-{pod_name}", namespace) + + def _delete_workload_secret(self, secret_name: str, namespace: str) -> None: + try: + self.kube_client.delete_namespaced_secret(secret_name, namespace) + except ApiException as e: + if str(e.status) != "404": + self.log.warning( + "Failed to delete workload secret %s in namespace %s: %s", + secret_name, + namespace, + e, + ) def patch_pod_revoked(self, *, pod_name: str, namespace: str): """ @@ -650,6 +705,7 @@ def patch_pod_executor_done(self, *, pod_name: str, namespace: str): ) except ApiException as e: self.log.info("Failed to patch pod %s with done annotation. Reason: %s", pod_name, e) + self._delete_workload_secret(f"airflow-workload-{pod_name}", namespace) def sync(self) -> None: """ diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py index 478a35045cc74..676ef3e4530c8 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py @@ -54,28 +54,23 @@ if TYPE_CHECKING: import datetime - from airflow.executors import workloads from airflow.models.taskinstance import TaskInstance log = logging.getLogger(__name__) MAX_LABEL_LEN = 63 +WORKLOAD_SECRET_VOLUME_NAME = "airflow-workload" +WORKLOAD_JSON_PATH = "/run/secrets/airflow-workload/workload.json" -def workload_to_command_args(workload: workloads.ExecuteTask) -> list[str]: - """ - Convert a workload object to Task SDK command arguments. - :param workload: The ExecuteTask workload to convert - :return: List of command arguments for the Task SDK - """ - ser_input = workload.model_dump_json() +def workload_to_command_args_json_path() -> list[str]: return [ "python", "-m", "airflow.sdk.execution_time.execute_workload", - "--json-string", - ser_input, + "--json-path", + WORKLOAD_JSON_PATH, ] @@ -85,14 +80,10 @@ def generate_pod_command_args(task_instance: TaskInstance) -> list[str]: This function handles backwards compatibility between Airflow 2.x and 3.x: - In Airflow 2.x: Uses the existing ``command_as_list()`` method - - In Airflow 3.x: Uses the Task SDK workload approach with serialized workload + - In Airflow 3.x: Uses the Task SDK workload approach with the workload read from a file """ if AIRFLOW_V_3_0_PLUS: - # In Airflow 3+, use the Task SDK workload approach - from airflow.executors import workloads - - workload = workloads.ExecuteTask.make(task_instance) - return workload_to_command_args(workload) + return workload_to_command_args_json_path() # In Airflow 2.x, use the existing method return task_instance.command_as_list() diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index f8b4a59356f0e..13924c52813c1 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -19,7 +19,9 @@ import random import re import string +import uuid from datetime import datetime +from pathlib import Path from unittest import mock import pytest @@ -52,6 +54,7 @@ create_unique_id, get_logs_task_metadata, ) +from airflow.providers.cncf.kubernetes.pod_generator import WORKLOAD_JSON_PATH from airflow.providers.common.compat.sdk import AirflowException from airflow.providers.standard.operators.empty import EmptyOperator @@ -67,6 +70,7 @@ if AIRFLOW_V_3_0_PLUS: LOGICAL_DATE_KEY = "logical_date" + from airflow.executors.workloads import BundleInfo, ExecuteTask, TaskInstance as WorkloadTaskInstance else: LOGICAL_DATE_KEY = "execution_date" @@ -604,6 +608,57 @@ def test_run_next_pod_reconciliation_error( finally: kubernetes_executor.end() + @pytest.mark.skipif( + AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed" + ) + @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="ExecuteTask workload is only available in Airflow 3") + @mock.patch( + "airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.AirflowKubernetesScheduler.run_pod_async" + ) + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + def test_run_next_creates_secret_and_uses_json_path_for_execute_task( + self, mock_get_kube_client, mock_kubernetes_job_watcher, mock_run_pod_async, data_file + ): + template_file = data_file("pods/generator_base_with_secrets.yaml").as_posix() + mock_kube_client = mock_get_kube_client.return_value + + workload = ExecuteTask( + token="test-token", + ti=WorkloadTaskInstance( + id=uuid.UUID("4d828a62-a417-4936-a7a6-2b3fabacecab"), + dag_version_id=uuid.UUID("4d828a62-a417-4936-a7a6-2b3fabacecab"), + task_id="test_task", + dag_id="test_dag", + run_id="test_run", + try_number=1, + pool_slots=1, + queue="default", + priority_weight=1, + ), + dag_rel_path=Path("test_dag.py"), + bundle_info=BundleInfo(name="test", version="1.0"), + log_path="test.log", + ) + + with conf_vars({("kubernetes_executor", "pod_template_file"): template_file}): + kubernetes_executor = self.kubernetes_executor + kubernetes_executor.start() + try: + kubernetes_executor.execute_async(key=workload.ti.key, queue=None, command=[workload]) + kubernetes_executor.sync() + + mock_kube_client.create_namespaced_secret.assert_called_once() + secret_body = mock_kube_client.create_namespaced_secret.call_args[1]["body"] + assert "workload.json" in secret_body.string_data + + mock_run_pod_async.assert_called_once() + pod = mock_run_pod_async.call_args[0][0] + assert "--json-path" in pod.spec.containers[0].args + assert WORKLOAD_JSON_PATH in pod.spec.containers[0].args + finally: + kubernetes_executor.end() + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubeConfig") @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.sync") @mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_tasks") diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_template_rendering.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_template_rendering.py index 7cefad4409d1f..00d824bab6668 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_template_rendering.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_template_rendering.py @@ -49,15 +49,14 @@ def test_render_k8s_pod_yaml(pod_mutation_hook, create_task_instance): ) if AIRFLOW_V_3_0_PLUS: - from airflow.executors import workloads + from airflow.providers.cncf.kubernetes.pod_generator import WORKLOAD_JSON_PATH - workload = workloads.ExecuteTask.make(ti) rendered_args = [ "python", "-m", "airflow.sdk.execution_time.execute_workload", - "--json-string", - workload.model_dump_json(), + "--json-path", + WORKLOAD_JSON_PATH, ] else: rendered_args = [ From e7b3b5fc54bcc58c2378f33d023cc246a9847c5a Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Thu, 19 Feb 2026 13:37:59 +0530 Subject: [PATCH 2/9] adding secret cleanup to cleanup-pods job --- chart/templates/rbac/pod-cleanup-role.yaml | 6 ++++++ .../helm_tests/airflow_aux/test_cleanup_pods.py | 16 ++++++++++++++++ .../cncf/kubernetes/cli/kubernetes_command.py | 13 +++++++++++++ .../kubernetes/cli/test_kubernetes_command.py | 8 ++++++-- 4 files changed, 41 insertions(+), 2 deletions(-) diff --git a/chart/templates/rbac/pod-cleanup-role.yaml b/chart/templates/rbac/pod-cleanup-role.yaml index af73df6f61946..837c67b0398e8 100644 --- a/chart/templates/rbac/pod-cleanup-role.yaml +++ b/chart/templates/rbac/pod-cleanup-role.yaml @@ -41,4 +41,10 @@ rules: verbs: - "list" - "delete" + - apiGroups: + - "" + resources: + - "secrets" + verbs: + - "delete" {{- end }} diff --git a/helm-tests/tests/helm_tests/airflow_aux/test_cleanup_pods.py b/helm-tests/tests/helm_tests/airflow_aux/test_cleanup_pods.py index 31006e526253c..35fd1306eac66 100644 --- a/helm-tests/tests/helm_tests/airflow_aux/test_cleanup_pods.py +++ b/helm-tests/tests/helm_tests/airflow_aux/test_cleanup_pods.py @@ -446,6 +446,22 @@ def test_global_volumes_and_volume_mounts(self): "emptyDir": {}, } in jmespath.search("spec.jobTemplate.spec.template.spec.volumes", docs[0]) + def test_cleanup_role_includes_secrets_delete_permission(self): + docs = render_chart( + values={ + "executor": "KubernetesExecutor", + "cleanup": {"enabled": True}, + "rbac": {"create": True}, + }, + show_only=["templates/rbac/pod-cleanup-role.yaml"], + ) + + rules = jmespath.search("rules", docs[0]) + assert rules == [ + {"apiGroups": [""], "resources": ["pods"], "verbs": ["list", "delete"]}, + {"apiGroups": [""], "resources": ["secrets"], "verbs": ["delete"]}, + ] + class TestCleanupServiceAccount: """Tests cleanup of service accounts.""" diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py index 2b9a2cba9dc01..e6c3d3c094962 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py @@ -199,3 +199,16 @@ def _delete_pod(name, namespace): print(f'Deleting POD "{name}" from "{namespace}" namespace') api_response = kube_client.delete_namespaced_pod(name=name, namespace=namespace, body=delete_options) print(api_response) + _delete_workload_secret(name, namespace) + + +def _delete_workload_secret(pod_name, namespace): + """Delete the workload secret associated with a pod if it exists.""" + kube_client = get_kube_client() + secret_name = f"airflow-workload-{pod_name}" + try: + kube_client.delete_namespaced_secret(name=secret_name, namespace=namespace) + print(f'Deleted workload secret "{secret_name}" from "{namespace}" namespace') + except ApiException as e: + if str(e.status) != "404": + print(f'Failed to delete workload secret "{secret_name}": {e}') diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/cli/test_kubernetes_command.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/cli/test_kubernetes_command.py index 6ae8b81c50f3c..3d9f562a0048b 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/cli/test_kubernetes_command.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/cli/test_kubernetes_command.py @@ -84,12 +84,16 @@ def setup_class(cls): importlib.reload(cli_parser) cls.parser = cli_parser.get_parser() + @mock.patch("kubernetes.client.CoreV1Api.delete_namespaced_secret") @mock.patch("kubernetes.client.CoreV1Api.delete_namespaced_pod") @mock.patch("airflow.providers.cncf.kubernetes.kube_client.config.load_incluster_config") - def test_delete_pod(self, load_incluster_config, delete_namespaced_pod): + def test_delete_pod(self, load_incluster_config, delete_namespaced_pod, delete_namespaced_secret): kubernetes_command._delete_pod("dummy", "awesome-namespace") delete_namespaced_pod.assert_called_with(body=mock.ANY, name="dummy", namespace="awesome-namespace") - load_incluster_config.assert_called_once() + delete_namespaced_secret.assert_called_with( + name="airflow-workload-dummy", namespace="awesome-namespace" + ) + load_incluster_config.assert_called() @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._delete_pod") @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") From b4a137bcb0b95a2d7b033af4d1105334476c70bb Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Thu, 19 Feb 2026 13:43:35 +0530 Subject: [PATCH 3/9] add other labels to secret --- .../executors/kubernetes_executor_utils.py | 13 ++++++++++++- .../executors/test_kubernetes_executor.py | 10 +++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index 608b5f12854de..9a94cad1cbe7d 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -47,6 +47,7 @@ from airflow.providers.cncf.kubernetes.pod_generator import ( WORKLOAD_SECRET_VOLUME_NAME, PodGenerator, + make_safe_label_value, workload_to_command_args_json_path, ) from airflow.providers.common.compat.sdk import AirflowException @@ -567,13 +568,23 @@ def run_next(self, next_job: KubernetesJob) -> None: if isinstance(command[0], ExecuteTask): workload = command[0] secret_name = f"{WORKLOAD_SECRET_VOLUME_NAME}-{pod_id}" + labels: dict[str, str] = { + "airflow-workload-secret": "true", + "dag_id": make_safe_label_value(workload.ti.dag_id), + "task_id": make_safe_label_value(workload.ti.task_id), + "run_id": make_safe_label_value(workload.ti.run_id), + "try_number": str(workload.ti.try_number), + "ti_id": str(workload.ti.id), + } + if workload.ti.map_index is not None and workload.ti.map_index >= 0: + labels["map_index"] = str(workload.ti.map_index) self.kube_client.create_namespaced_secret( namespace=self.namespace, body=client.V1Secret( metadata=client.V1ObjectMeta( name=secret_name, namespace=self.namespace, - labels={"airflow-workload-secret": "true"}, + labels=labels, ), string_data={"workload.json": workload.model_dump_json()}, ), diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index 13924c52813c1..46b964bcb74fe 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -617,7 +617,7 @@ def test_run_next_pod_reconciliation_error( ) @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") - def test_run_next_creates_secret_and_uses_json_path_for_execute_task( + def test_run_next_creates_workload_secret_for_execute_task( self, mock_get_kube_client, mock_kubernetes_job_watcher, mock_run_pod_async, data_file ): template_file = data_file("pods/generator_base_with_secrets.yaml").as_posix() @@ -651,6 +651,14 @@ def test_run_next_creates_secret_and_uses_json_path_for_execute_task( mock_kube_client.create_namespaced_secret.assert_called_once() secret_body = mock_kube_client.create_namespaced_secret.call_args[1]["body"] assert "workload.json" in secret_body.string_data + labels = secret_body.metadata.labels + assert labels["airflow-workload-secret"] == "true" + assert labels["dag_id"] == "test_dag" + assert labels["task_id"] == "test_task" + assert labels["run_id"] == "test_run" + assert labels["try_number"] == "1" + assert labels["ti_id"] == "4d828a62-a417-4936-a7a6-2b3fabacecab" + assert "map_index" not in labels mock_run_pod_async.assert_called_once() pod = mock_run_pod_async.call_args[0][0] From 0b3d3e9757739f309ff328f49695c31c75e4a758 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Thu, 19 Feb 2026 13:54:13 +0530 Subject: [PATCH 4/9] moving mount logic to construct_pod --- .../executors/kubernetes_executor_utils.py | 20 +----------------- .../cncf/kubernetes/pod_generator.py | 21 +++++++++++++++++++ 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index 9a94cad1cbe7d..baee8b0cd3542 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -619,27 +619,9 @@ def run_next(self, next_job: KubernetesJob) -> None: pod_override_object=kube_executor_config, base_worker_pod=base_worker_pod, with_mutation_hook=True, + workload_secret_name=secret_name or None, ) - if secret_name: - if pod.spec.volumes is None: - pod.spec.volumes = [] - pod.spec.volumes.append( - client.V1Volume( - name=WORKLOAD_SECRET_VOLUME_NAME, - secret=client.V1SecretVolumeSource(secret_name=secret_name), - ) - ) - if pod.spec.containers[0].volume_mounts is None: - pod.spec.containers[0].volume_mounts = [] - pod.spec.containers[0].volume_mounts.append( - client.V1VolumeMount( - name=WORKLOAD_SECRET_VOLUME_NAME, - mount_path="/run/secrets/airflow-workload", - read_only=True, - ) - ) - # Reconcile the pod generated by the Operator and the Pod # generated by the .cfg file self.log.info( diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py index 676ef3e4530c8..b453bb16aba96 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py @@ -61,6 +61,7 @@ MAX_LABEL_LEN = 63 WORKLOAD_SECRET_VOLUME_NAME = "airflow-workload" +WORKLOAD_SECRET_MOUNT_PATH = "/run/secrets/airflow-workload" WORKLOAD_JSON_PATH = "/run/secrets/airflow-workload/workload.json" @@ -319,6 +320,7 @@ def construct_pod( map_index: int = -1, *, with_mutation_hook: bool = False, + workload_secret_name: str | None = None, ) -> k8s.V1Pod: """ Create a Pod. @@ -395,6 +397,25 @@ def construct_pod( except Exception as e: raise PodReconciliationError from e + if workload_secret_name: + if pod.spec.volumes is None: + pod.spec.volumes = [] + pod.spec.volumes.append( + k8s.V1Volume( + name=WORKLOAD_SECRET_VOLUME_NAME, + secret=k8s.V1SecretVolumeSource(secret_name=workload_secret_name), + ) + ) + if pod.spec.containers[0].volume_mounts is None: + pod.spec.containers[0].volume_mounts = [] + pod.spec.containers[0].volume_mounts.append( + k8s.V1VolumeMount( + name=WORKLOAD_SECRET_VOLUME_NAME, + mount_path=WORKLOAD_SECRET_MOUNT_PATH, + read_only=True, + ) + ) + if with_mutation_hook: from airflow.settings import pod_mutation_hook From 0cb0b19b1c8b0a780ca491fe63a8f9ecab728298 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Thu, 19 Feb 2026 14:13:08 +0530 Subject: [PATCH 5/9] swapping to ownerRefs --- chart/templates/rbac/pod-launcher-role.yaml | 3 +- .../security/test_rbac_pod_launcher.py | 4 +- .../executors/kubernetes_executor_utils.py | 46 +++++++++++-------- .../executors/test_kubernetes_executor.py | 9 ++++ 4 files changed, 40 insertions(+), 22 deletions(-) diff --git a/chart/templates/rbac/pod-launcher-role.yaml b/chart/templates/rbac/pod-launcher-role.yaml index c804fb61914da..6e2e73be87b78 100644 --- a/chart/templates/rbac/pod-launcher-role.yaml +++ b/chart/templates/rbac/pod-launcher-role.yaml @@ -83,6 +83,5 @@ rules: - "secrets" verbs: - "create" - - "get" - - "delete" + - "patch" {{- end }} diff --git a/helm-tests/tests/helm_tests/security/test_rbac_pod_launcher.py b/helm-tests/tests/helm_tests/security/test_rbac_pod_launcher.py index ae8726ef32df8..6e346853ce0c3 100644 --- a/helm-tests/tests/helm_tests/security/test_rbac_pod_launcher.py +++ b/helm-tests/tests/helm_tests/security/test_rbac_pod_launcher.py @@ -34,14 +34,14 @@ class TestPodLauncher: "expected_secret_verbs", ), [ - (True, True, False, "Role", "release-name-pod-launcher-role", {"create", "get", "delete"}), + (True, True, False, "Role", "release-name-pod-launcher-role", {"create", "patch"}), ( True, True, True, "ClusterRole", "default-release-name-pod-launcher-role", - {"create", "get", "delete"}, + {"create", "patch"}, ), (True, False, False, None, None, None), (False, True, False, None, None, None), diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index baee8b0cd3542..ca9fabe437641 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -633,11 +633,34 @@ def run_next(self, next_job: KubernetesJob) -> None: self.log.debug("Kubernetes running for command %s", command) self.log.debug("Kubernetes launching image %s", pod.spec.containers[0].image) - with contextlib.ExitStack() as stack: - if secret_name: - stack.callback(self._delete_workload_secret, secret_name, self.namespace) - self.run_pod_async(pod, **self.kube_config.kube_client_request_args) - stack.pop_all() + resp = self.run_pod_async(pod, **self.kube_config.kube_client_request_args) + + if secret_name: + try: + self.kube_client.patch_namespaced_secret( + name=secret_name, + namespace=self.namespace, + body={ + "metadata": { + "ownerReferences": [ + { + "apiVersion": "v1", + "kind": "Pod", + "name": resp.metadata.name, + "uid": resp.metadata.uid, + # Pod should not wait on secret to be deleted + "blockOwnerDeletion": False, + } + ] + } + }, + ) + except ApiException: + self.log.warning( + "Could not set ownerReference on workload secret %s; as a fallback the cleanup CronJob will delete it.", + secret_name, + exc_info=True, + ) def delete_pod(self, pod_name: str, namespace: str) -> None: """Delete Pod from a namespace; does not raise if it does not exist.""" @@ -653,19 +676,6 @@ def delete_pod(self, pod_name: str, namespace: str) -> None: # If the pod is already deleted if str(e.status) != "404": raise - self._delete_workload_secret(f"airflow-workload-{pod_name}", namespace) - - def _delete_workload_secret(self, secret_name: str, namespace: str) -> None: - try: - self.kube_client.delete_namespaced_secret(secret_name, namespace) - except ApiException as e: - if str(e.status) != "404": - self.log.warning( - "Failed to delete workload secret %s in namespace %s: %s", - secret_name, - namespace, - e, - ) def patch_pod_revoked(self, *, pod_name: str, namespace: str): """ diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index 46b964bcb74fe..58c37978ead1e 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -641,6 +641,9 @@ def test_run_next_creates_workload_secret_for_execute_task( log_path="test.log", ) + mock_run_pod_async.return_value.metadata.name = "test-pod-name" + mock_run_pod_async.return_value.metadata.uid = "test-pod-uid" + with conf_vars({("kubernetes_executor", "pod_template_file"): template_file}): kubernetes_executor = self.kubernetes_executor kubernetes_executor.start() @@ -664,6 +667,12 @@ def test_run_next_creates_workload_secret_for_execute_task( pod = mock_run_pod_async.call_args[0][0] assert "--json-path" in pod.spec.containers[0].args assert WORKLOAD_JSON_PATH in pod.spec.containers[0].args + + mock_kube_client.patch_namespaced_secret.assert_called_once_with( + name=secret_body.metadata.name, + namespace="default", + body=mock.ANY, + ) finally: kubernetes_executor.end() From 3d773ea8872bad54342a1ada277664d9b0a9bb9d Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Thu, 19 Feb 2026 14:18:28 +0530 Subject: [PATCH 6/9] cleanup --- .../cncf/kubernetes/executors/kubernetes_executor_utils.py | 5 ++--- .../src/airflow/providers/cncf/kubernetes/pod_generator.py | 1 + 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index ca9fabe437641..6cf29b0c0b42f 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -45,7 +45,7 @@ create_unique_id, ) from airflow.providers.cncf.kubernetes.pod_generator import ( - WORKLOAD_SECRET_VOLUME_NAME, + WORKLOAD_SECRET_NAME, PodGenerator, make_safe_label_value, workload_to_command_args_json_path, @@ -567,7 +567,7 @@ def run_next(self, next_job: KubernetesJob) -> None: if isinstance(command[0], ExecuteTask): workload = command[0] - secret_name = f"{WORKLOAD_SECRET_VOLUME_NAME}-{pod_id}" + secret_name = f"{WORKLOAD_SECRET_NAME}-{pod_id}" labels: dict[str, str] = { "airflow-workload-secret": "true", "dag_id": make_safe_label_value(workload.ti.dag_id), @@ -708,7 +708,6 @@ def patch_pod_executor_done(self, *, pod_name: str, namespace: str): ) except ApiException as e: self.log.info("Failed to patch pod %s with done annotation. Reason: %s", pod_name, e) - self._delete_workload_secret(f"airflow-workload-{pod_name}", namespace) def sync(self) -> None: """ diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py index b453bb16aba96..88d21b032dbc9 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py @@ -61,6 +61,7 @@ MAX_LABEL_LEN = 63 WORKLOAD_SECRET_VOLUME_NAME = "airflow-workload" +WORKLOAD_SECRET_NAME = "airflow-workload" WORKLOAD_SECRET_MOUNT_PATH = "/run/secrets/airflow-workload" WORKLOAD_JSON_PATH = "/run/secrets/airflow-workload/workload.json" From 0fa0a3aec9e123fd618ae821866ca50f73457ea5 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Wed, 25 Feb 2026 11:19:34 +0530 Subject: [PATCH 7/9] simpler commits from jed --- .../cncf/kubernetes/cli/kubernetes_command.py | 4 ++-- .../kubernetes/executors/kubernetes_executor_utils.py | 10 +++++----- .../providers/cncf/kubernetes/hooks/kubernetes.py | 2 +- .../kubernetes/operators/custom_object_launcher.py | 2 +- .../airflow/providers/cncf/kubernetes/pod_generator.py | 2 +- .../providers/cncf/kubernetes/utils/pod_manager.py | 2 +- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py index e6c3d3c094962..fd0210424ee56 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py @@ -202,7 +202,7 @@ def _delete_pod(name, namespace): _delete_workload_secret(name, namespace) -def _delete_workload_secret(pod_name, namespace): +def _delete_workload_secret(pod_name: str, namespace: str) -> None: """Delete the workload secret associated with a pod if it exists.""" kube_client = get_kube_client() secret_name = f"airflow-workload-{pod_name}" @@ -210,5 +210,5 @@ def _delete_workload_secret(pod_name, namespace): kube_client.delete_namespaced_secret(name=secret_name, namespace=namespace) print(f'Deleted workload secret "{secret_name}" from "{namespace}" namespace') except ApiException as e: - if str(e.status) != "404": + if e.status != 404: print(f'Failed to delete workload secret "{secret_name}": {e}') diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index 6cf29b0c0b42f..9472a49cd8cb8 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -45,7 +45,7 @@ create_unique_id, ) from airflow.providers.cncf.kubernetes.pod_generator import ( - WORKLOAD_SECRET_NAME, + WORKLOAD_SECRET_NAME_PREFIX, PodGenerator, make_safe_label_value, workload_to_command_args_json_path, @@ -560,14 +560,14 @@ def run_next(self, next_job: KubernetesJob) -> None: dag_id, task_id, run_id, try_number, map_index = key pod_id = create_unique_id(dag_id, task_id) - secret_name = "" + secret_name: str | None = None if len(command) == 1: from airflow.executors.workloads import ExecuteTask if isinstance(command[0], ExecuteTask): workload = command[0] - secret_name = f"{WORKLOAD_SECRET_NAME}-{pod_id}" + secret_name = f"{WORKLOAD_SECRET_NAME_PREFIX}-{pod_id}" labels: dict[str, str] = { "airflow-workload-secret": "true", "dag_id": make_safe_label_value(workload.ti.dag_id), @@ -619,7 +619,7 @@ def run_next(self, next_job: KubernetesJob) -> None: pod_override_object=kube_executor_config, base_worker_pod=base_worker_pod, with_mutation_hook=True, - workload_secret_name=secret_name or None, + workload_secret_name=secret_name, ) # Reconcile the pod generated by the Operator and the Pod @@ -674,7 +674,7 @@ def delete_pod(self, pod_name: str, namespace: str) -> None: ) except ApiException as e: # If the pod is already deleted - if str(e.status) != "404": + if e.status != 404: raise def patch_pod_revoked(self, *, pod_name: str, namespace: str): diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index 81f09b0bee426..b9f61308da127 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -971,7 +971,7 @@ async def delete_pod(self, name: str, namespace: str): ) except async_client.ApiException as e: # If the pod is already deleted - if str(e.status) != "404": + if e.status != 404: raise @generic_api_retry diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py index bb7453eaf270c..04916f9e3706b 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py @@ -364,5 +364,5 @@ def delete_spark_job(self, spark_job_name=None): ) except ApiException as e: # If the pod is already deleted - if str(e.status) != "404": + if e.status != 404: raise diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py index 88d21b032dbc9..b4b658b6913eb 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py @@ -61,7 +61,7 @@ MAX_LABEL_LEN = 63 WORKLOAD_SECRET_VOLUME_NAME = "airflow-workload" -WORKLOAD_SECRET_NAME = "airflow-workload" +WORKLOAD_SECRET_NAME_PREFIX = "airflow-workload" WORKLOAD_SECRET_MOUNT_PATH = "/run/secrets/airflow-workload" WORKLOAD_JSON_PATH = "/run/secrets/airflow-workload/workload.json" diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 01f8f4a3fa2c6..b057ef48197f1 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -354,7 +354,7 @@ def delete_pod(self, pod: V1Pod) -> None: ) except ApiException as e: # If the pod is already deleted - if str(e.status) != "404": + if e.status != 404: raise @generic_api_retry From c123e2c33c10a1c30a6bc0fa2d80d456810c802d Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Wed, 25 Feb 2026 11:49:14 +0530 Subject: [PATCH 8/9] handling comments from jed about safe deletion --- chart/templates/rbac/pod-cleanup-role.yaml | 1 + .../airflow_aux/test_cleanup_pods.py | 2 +- .../cncf/kubernetes/cli/kubernetes_command.py | 27 ++++++++- .../executors/kubernetes_executor_utils.py | 15 ++++- .../kubernetes/cli/test_kubernetes_command.py | 59 ++++++++++++++++--- .../executors/test_kubernetes_executor.py | 48 +++++++++++++++ 6 files changed, 141 insertions(+), 11 deletions(-) diff --git a/chart/templates/rbac/pod-cleanup-role.yaml b/chart/templates/rbac/pod-cleanup-role.yaml index 837c67b0398e8..5f5b42bcfdca6 100644 --- a/chart/templates/rbac/pod-cleanup-role.yaml +++ b/chart/templates/rbac/pod-cleanup-role.yaml @@ -46,5 +46,6 @@ rules: resources: - "secrets" verbs: + - "list" - "delete" {{- end }} diff --git a/helm-tests/tests/helm_tests/airflow_aux/test_cleanup_pods.py b/helm-tests/tests/helm_tests/airflow_aux/test_cleanup_pods.py index 35fd1306eac66..85c758af25159 100644 --- a/helm-tests/tests/helm_tests/airflow_aux/test_cleanup_pods.py +++ b/helm-tests/tests/helm_tests/airflow_aux/test_cleanup_pods.py @@ -459,7 +459,7 @@ def test_cleanup_role_includes_secrets_delete_permission(self): rules = jmespath.search("rules", docs[0]) assert rules == [ {"apiGroups": [""], "resources": ["pods"], "verbs": ["list", "delete"]}, - {"apiGroups": [""], "resources": ["secrets"], "verbs": ["delete"]}, + {"apiGroups": [""], "resources": ["secrets"], "verbs": ["list", "delete"]}, ] diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py index fd0210424ee56..ebaf024eeb21d 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py @@ -31,7 +31,11 @@ from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubeConfig from airflow.providers.cncf.kubernetes.kube_client import get_kube_client from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import create_unique_id -from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator, generate_pod_command_args +from airflow.providers.cncf.kubernetes.pod_generator import ( + WORKLOAD_SECRET_NAME_PREFIX, + PodGenerator, + generate_pod_command_args, +) from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS from airflow.utils import cli as cli_utils, yaml from airflow.utils.providers_configuration_loader import providers_configuration_loaded @@ -187,6 +191,8 @@ def cleanup_pods(args): break list_kwargs["_continue"] = continue_token + _cleanup_orphaned_workload_secrets(kube_client, namespace) + def _delete_pod(name, namespace): """ @@ -202,6 +208,25 @@ def _delete_pod(name, namespace): _delete_workload_secret(name, namespace) +def _cleanup_orphaned_workload_secrets(kube_client, namespace: str) -> None: + """Delete workload secrets whose owner pod no longer exists.""" + print(f"Checking for orphaned workload secrets in namespace {namespace}") + existing_pods = {pod.metadata.name for pod in kube_client.list_namespaced_pod(namespace=namespace).items} + secret_list = kube_client.list_namespaced_secret( + namespace=namespace, label_selector="airflow-workload-secret=true" + ) + for secret in secret_list.items: + secret_name = secret.metadata.name + pod_name = secret_name.removeprefix(WORKLOAD_SECRET_NAME_PREFIX + "-") + if pod_name not in existing_pods: + print(f'Deleting orphaned workload secret "{secret_name}"') + try: + kube_client.delete_namespaced_secret(name=secret_name, namespace=namespace) + except ApiException as e: + if e.status != 404: + print(f'Failed to delete orphaned workload secret "{secret_name}": {e}', file=sys.stderr) + + def _delete_workload_secret(pod_name: str, namespace: str) -> None: """Delete the workload secret associated with a pod if it exists.""" kube_client = get_kube_client() diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index 9472a49cd8cb8..f9fdc6d7055cc 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -633,7 +633,20 @@ def run_next(self, next_job: KubernetesJob) -> None: self.log.debug("Kubernetes running for command %s", command) self.log.debug("Kubernetes launching image %s", pod.spec.containers[0].image) - resp = self.run_pod_async(pod, **self.kube_config.kube_client_request_args) + try: + resp = self.run_pod_async(pod, **self.kube_config.kube_client_request_args) + except Exception: + if secret_name: + try: + self.kube_client.delete_namespaced_secret(secret_name, self.namespace) + except ApiException: + self.log.debug( + "Failed to clean up workload secret %s after pod creation failure; " + "it will be removed by the cleanup CronJob.", + secret_name, + exc_info=True, + ) + raise if secret_name: try: diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/cli/test_kubernetes_command.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/cli/test_kubernetes_command.py index 3d9f562a0048b..e9660b622dc7d 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/cli/test_kubernetes_command.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/cli/test_kubernetes_command.py @@ -95,10 +95,13 @@ def test_delete_pod(self, load_incluster_config, delete_namespaced_pod, delete_n ) load_incluster_config.assert_called() + @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._cleanup_orphaned_workload_secrets") @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._delete_pod") @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") @mock.patch("airflow.providers.cncf.kubernetes.kube_client.config.load_incluster_config") - def test_running_pods_are_not_cleaned(self, load_incluster_config, list_namespaced_pod, delete_pod): + def test_running_pods_are_not_cleaned( + self, load_incluster_config, list_namespaced_pod, delete_pod, mock_cleanup_orphaned + ): pod1 = MagicMock() pod1.metadata.name = "dummy" pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z") @@ -117,10 +120,13 @@ def test_running_pods_are_not_cleaned(self, load_incluster_config, list_namespac delete_pod.assert_not_called() load_incluster_config.assert_called_once() + @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._cleanup_orphaned_workload_secrets") @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._delete_pod") @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") @mock.patch("airflow.providers.cncf.kubernetes.kube_client.config.load_incluster_config") - def test_cleanup_succeeded_pods(self, load_incluster_config, list_namespaced_pod, delete_pod): + def test_cleanup_succeeded_pods( + self, load_incluster_config, list_namespaced_pod, delete_pod, mock_cleanup_orphaned + ): pod1 = MagicMock() pod1.metadata.name = "dummy" pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z") @@ -139,11 +145,12 @@ def test_cleanup_succeeded_pods(self, load_incluster_config, list_namespaced_pod delete_pod.assert_called_with("dummy", "awesome-namespace") load_incluster_config.assert_called_once() + @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._cleanup_orphaned_workload_secrets") @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._delete_pod") @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") @mock.patch("kubernetes.config.load_incluster_config") def test_no_cleanup_failed_pods_wo_restart_policy_never( - self, load_incluster_config, list_namespaced_pod, delete_pod + self, load_incluster_config, list_namespaced_pod, delete_pod, mock_cleanup_orphaned ): pod1 = MagicMock() pod1.metadata.name = "dummy2" @@ -164,11 +171,12 @@ def test_no_cleanup_failed_pods_wo_restart_policy_never( delete_pod.assert_not_called() load_incluster_config.assert_called_once() + @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._cleanup_orphaned_workload_secrets") @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._delete_pod") @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") @mock.patch("kubernetes.config.load_incluster_config") def test_cleanup_failed_pods_w_restart_policy_never( - self, load_incluster_config, list_namespaced_pod, delete_pod + self, load_incluster_config, list_namespaced_pod, delete_pod, mock_cleanup_orphaned ): pod1 = MagicMock() pod1.metadata.name = "dummy3" @@ -189,10 +197,13 @@ def test_cleanup_failed_pods_w_restart_policy_never( delete_pod.assert_called_with("dummy3", "awesome-namespace") load_incluster_config.assert_called_once() + @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._cleanup_orphaned_workload_secrets") @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._delete_pod") @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") @mock.patch("kubernetes.config.load_incluster_config") - def test_cleanup_evicted_pods(self, load_incluster_config, list_namespaced_pod, delete_pod): + def test_cleanup_evicted_pods( + self, load_incluster_config, list_namespaced_pod, delete_pod, mock_cleanup_orphaned + ): pod1 = MagicMock() pod1.metadata.name = "dummy4" pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z") @@ -212,10 +223,13 @@ def test_cleanup_evicted_pods(self, load_incluster_config, list_namespaced_pod, delete_pod.assert_called_with("dummy4", "awesome-namespace") load_incluster_config.assert_called_once() + @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._cleanup_orphaned_workload_secrets") @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._delete_pod") @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") @mock.patch("kubernetes.config.load_incluster_config") - def test_cleanup_pending_pods(self, load_incluster_config, list_namespaced_pod, delete_pod): + def test_cleanup_pending_pods( + self, load_incluster_config, list_namespaced_pod, delete_pod, mock_cleanup_orphaned + ): pod1 = MagicMock() pod1.metadata.name = "dummy5" pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z") @@ -234,10 +248,13 @@ def test_cleanup_pending_pods(self, load_incluster_config, list_namespaced_pod, delete_pod.assert_called_with("dummy5", "awesome-namespace") load_incluster_config.assert_called_once() + @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._cleanup_orphaned_workload_secrets") @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._delete_pod") @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") @mock.patch("kubernetes.config.load_incluster_config") - def test_cleanup_api_exception_continue(self, load_incluster_config, list_namespaced_pod, delete_pod): + def test_cleanup_api_exception_continue( + self, load_incluster_config, list_namespaced_pod, delete_pod, mock_cleanup_orphaned + ): delete_pod.side_effect = kubernetes.client.rest.ApiException(status=0) pod1 = MagicMock() pod1.metadata.name = "dummy" @@ -256,10 +273,13 @@ def test_cleanup_api_exception_continue(self, load_incluster_config, list_namesp ) load_incluster_config.assert_called_once() + @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._cleanup_orphaned_workload_secrets") @mock.patch("airflow.providers.cncf.kubernetes.cli.kubernetes_command._delete_pod") @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") @mock.patch("kubernetes.config.load_incluster_config") - def test_list_pod_with_continue_token(self, load_incluster_config, list_namespaced_pod, delete_pod): + def test_list_pod_with_continue_token( + self, load_incluster_config, list_namespaced_pod, delete_pod, mock_cleanup_orphaned + ): pod1 = MagicMock() pod1.metadata.name = "dummy" pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z") @@ -287,3 +307,26 @@ def test_list_pod_with_continue_token(self, load_incluster_config, list_namespac list_namespaced_pod.assert_has_calls(calls) delete_pod.assert_called_with("dummy", "awesome-namespace") load_incluster_config.assert_called_once() + + def test_cleanup_orphaned_secrets_deletes_when_pod_gone(self): + mock_kube_client = MagicMock() + + existing_pod = MagicMock() + existing_pod.metadata.name = "live-pod" + mock_kube_client.list_namespaced_pod.return_value.items = [existing_pod] + + orphaned_secret = MagicMock() + orphaned_secret.metadata.name = "airflow-workload-gone-pod" + live_secret = MagicMock() + live_secret.metadata.name = f"airflow-workload-{existing_pod.metadata.name}" + mock_kube_client.list_namespaced_secret.return_value.items = [orphaned_secret, live_secret] + + kubernetes_command._cleanup_orphaned_workload_secrets(mock_kube_client, "awesome-namespace") + + mock_kube_client.list_namespaced_pod.assert_called_once_with(namespace="awesome-namespace") + mock_kube_client.list_namespaced_secret.assert_called_once_with( + namespace="awesome-namespace", label_selector="airflow-workload-secret=true" + ) + mock_kube_client.delete_namespaced_secret.assert_called_once_with( + name="airflow-workload-gone-pod", namespace="awesome-namespace" + ) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index 58c37978ead1e..5391a755793a2 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -676,6 +676,54 @@ def test_run_next_creates_workload_secret_for_execute_task( finally: kubernetes_executor.end() + @pytest.mark.skipif( + AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed" + ) + @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="ExecuteTask workload is only available in Airflow 3") + @mock.patch( + "airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.AirflowKubernetesScheduler.run_pod_async" + ) + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + def test_run_next_deletes_secret_if_pod_creation_fails( + self, mock_get_kube_client, mock_kubernetes_job_watcher, mock_run_pod_async, data_file + ): + template_file = data_file("pods/generator_base_with_secrets.yaml").as_posix() + mock_kube_client = mock_get_kube_client.return_value + # mock pod creation failure here + mock_run_pod_async.side_effect = ApiException(status=500, reason="Internal Server Error") + + workload = ExecuteTask( + token="test-token", + ti=WorkloadTaskInstance( + id=uuid.UUID("4d828a62-a417-4936-a7a6-2b3fabacecab"), + dag_version_id=uuid.UUID("4d828a62-a417-4936-a7a6-2b3fabacecab"), + task_id="test_task", + dag_id="test_dag", + run_id="test_run", + try_number=1, + pool_slots=1, + queue="default", + priority_weight=1, + ), + dag_rel_path=Path("test_dag.py"), + bundle_info=BundleInfo(name="test", version="1.0"), + log_path="test.log", + ) + + with conf_vars({("kubernetes_executor", "pod_template_file"): template_file}): + kubernetes_executor = self.kubernetes_executor + kubernetes_executor.start() + try: + kubernetes_executor.execute_async(key=workload.ti.key, queue=None, command=[workload]) + kubernetes_executor.sync() + + mock_kube_client.create_namespaced_secret.assert_called_once() + secret_name = mock_kube_client.create_namespaced_secret.call_args[1]["body"].metadata.name + mock_kube_client.delete_namespaced_secret.assert_called_once_with(secret_name, "default") + finally: + kubernetes_executor.end() + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubeConfig") @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.sync") @mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_tasks") From f4a1dc94224c58a22ce72a196562d87700e1efc7 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Wed, 25 Feb 2026 11:55:34 +0530 Subject: [PATCH 9/9] making error more explicit --- .../executors/kubernetes_executor_utils.py | 56 +++++++++++++------ 1 file changed, 40 insertions(+), 16 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index f9fdc6d7055cc..a1193eb71f8b6 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -28,6 +28,7 @@ from urllib3.exceptions import ReadTimeoutError from airflow.providers.cncf.kubernetes.backcompat import get_logical_date_key +from airflow.providers.cncf.kubernetes.exceptions import KubernetesApiPermissionError from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import ( ADOPTED, ALL_NAMESPACES, @@ -578,17 +579,29 @@ def run_next(self, next_job: KubernetesJob) -> None: } if workload.ti.map_index is not None and workload.ti.map_index >= 0: labels["map_index"] = str(workload.ti.map_index) - self.kube_client.create_namespaced_secret( - namespace=self.namespace, - body=client.V1Secret( - metadata=client.V1ObjectMeta( - name=secret_name, - namespace=self.namespace, - labels=labels, + try: + self.kube_client.create_namespaced_secret( + namespace=self.namespace, + body=client.V1Secret( + metadata=client.V1ObjectMeta( + name=secret_name, + namespace=self.namespace, + labels=labels, + ), + string_data={"workload.json": workload.model_dump_json()}, ), - string_data={"workload.json": workload.model_dump_json()}, - ), - ) + ) + except ApiException as e: + if e.status == 403: + raise KubernetesApiPermissionError( + f"Failed to create workload secret '{secret_name}' in namespace " + f"'{self.namespace}': permission denied (HTTP 403). " + "Ensure the RBAC pod-launcher-role has 'create' and 'patch' verbs for " + "'secrets'. If you recently upgraded the cncf-kubernetes provider, " + "update the pod-launcher-role in your helm charts or grant the missing " + "permissions manually." + ) from e + raise command = workload_to_command_args_json_path() else: raise ValueError( @@ -668,12 +681,23 @@ def run_next(self, next_job: KubernetesJob) -> None: } }, ) - except ApiException: - self.log.warning( - "Could not set ownerReference on workload secret %s; as a fallback the cleanup CronJob will delete it.", - secret_name, - exc_info=True, - ) + except ApiException as e: + if e.status == 403: + self.log.warning( + "Could not set ownerReference on workload secret %s: permission denied (HTTP 403). " + "Ensure the scheduler's RBAC role grants the 'patch' verb on 'secrets'. " + "If you recently upgraded the cncf-kubernetes provider, update the " + "pod-launcher-role in your Helm chart. " + "The cleanup CronJob will delete the secret as a fallback.", + secret_name, + ) + else: + self.log.warning( + "Could not set ownerReference on workload secret %s; " + "as a fallback the cleanup CronJob will delete it.", + secret_name, + exc_info=True, + ) def delete_pod(self, pod_name: str, namespace: str) -> None: """Delete Pod from a namespace; does not raise if it does not exist."""