Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions chart/templates/rbac/pod-cleanup-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,11 @@ rules:
verbs:
- "list"
- "delete"
- apiGroups:
- ""
resources:
- "secrets"
verbs:
- "list"
- "delete"
{{- end }}
7 changes: 7 additions & 0 deletions chart/templates/rbac/pod-launcher-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,11 @@ rules:
verbs:
- "list"
- "watch"
- apiGroups:
- ""
resources:
- "secrets"
verbs:
- "create"
- "patch"
{{- end }}
16 changes: 16 additions & 0 deletions helm-tests/tests/helm_tests/airflow_aux/test_cleanup_pods.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,22 @@ def test_global_volumes_and_volume_mounts(self):
"emptyDir": {},
} in jmespath.search("spec.jobTemplate.spec.template.spec.volumes", docs[0])

def test_cleanup_role_includes_secrets_delete_permission(self):
docs = render_chart(
values={
"executor": "KubernetesExecutor",
"cleanup": {"enabled": True},
"rbac": {"create": True},
},
show_only=["templates/rbac/pod-cleanup-role.yaml"],
)

rules = jmespath.search("rules", docs[0])
assert rules == [
{"apiGroups": [""], "resources": ["pods"], "verbs": ["list", "delete"]},
{"apiGroups": [""], "resources": ["secrets"], "verbs": ["list", "delete"]},
]


class TestCleanupServiceAccount:
"""Tests cleanup of service accounts."""
Expand Down
30 changes: 24 additions & 6 deletions helm-tests/tests/helm_tests/security/test_rbac_pod_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,30 @@ class TestPodLauncher:
"""Tests RBAC Pod Launcher."""

@pytest.mark.parametrize(
("rbac_create", "allow_pod_launching", "multi_ns", "expected_kind", "expected_name"),
(
"rbac_create",
"allow_pod_launching",
"multi_ns",
"expected_kind",
"expected_name",
"expected_secret_verbs",
),
[
(True, True, False, "Role", "release-name-pod-launcher-role"),
(True, True, True, "ClusterRole", "default-release-name-pod-launcher-role"),
(True, False, False, None, None),
(False, True, False, None, None),
(True, True, False, "Role", "release-name-pod-launcher-role", {"create", "patch"}),
(
True,
True,
True,
"ClusterRole",
"default-release-name-pod-launcher-role",
{"create", "patch"},
),
(True, False, False, None, None, None),
(False, True, False, None, None, None),
],
)
def test_pod_launcher_role(
self, rbac_create, allow_pod_launching, multi_ns, expected_kind, expected_name
self, rbac_create, allow_pod_launching, multi_ns, expected_kind, expected_name, expected_secret_verbs
):
docs = render_chart(
values={
Expand All @@ -49,6 +63,10 @@ def test_pod_launcher_role(
else:
assert docs[0]["kind"] == expected_kind
assert docs[0]["metadata"]["name"] == expected_name
rules = jmespath.search("rules", docs[0])
secrets_rule = next((r for r in rules if "secrets" in r.get("resources", [])), None)
assert secrets_rule is not None
assert set(secrets_rule["verbs"]) == expected_secret_verbs

@pytest.mark.parametrize(
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubeConfig
from airflow.providers.cncf.kubernetes.kube_client import get_kube_client
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import create_unique_id
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator, generate_pod_command_args
from airflow.providers.cncf.kubernetes.pod_generator import (
WORKLOAD_SECRET_NAME_PREFIX,
PodGenerator,
generate_pod_command_args,
)
from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS
from airflow.utils import cli as cli_utils, yaml
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
Expand Down Expand Up @@ -187,6 +191,8 @@
break
list_kwargs["_continue"] = continue_token

_cleanup_orphaned_workload_secrets(kube_client, namespace)


def _delete_pod(name, namespace):
"""
Expand All @@ -199,3 +205,35 @@
print(f'Deleting POD "{name}" from "{namespace}" namespace')
api_response = kube_client.delete_namespaced_pod(name=name, namespace=namespace, body=delete_options)
print(api_response)
_delete_workload_secret(name, namespace)


def _cleanup_orphaned_workload_secrets(kube_client, namespace: str) -> None:
"""Delete workload secrets whose owner pod no longer exists."""
print(f"Checking for orphaned workload secrets in namespace {namespace}")
existing_pods = {pod.metadata.name for pod in kube_client.list_namespaced_pod(namespace=namespace).items}
secret_list = kube_client.list_namespaced_secret(
namespace=namespace, label_selector="airflow-workload-secret=true"
)
for secret in secret_list.items:
secret_name = secret.metadata.name
pod_name = secret_name.removeprefix(WORKLOAD_SECRET_NAME_PREFIX + "-")
if pod_name not in existing_pods:
print(f'Deleting orphaned workload secret "{secret_name}"')
try:
kube_client.delete_namespaced_secret(name=secret_name, namespace=namespace)
except ApiException as e:
if e.status != 404:
print(f'Failed to delete orphaned workload secret "{secret_name}": {e}', file=sys.stderr)


def _delete_workload_secret(pod_name: str, namespace: str) -> None:
"""Delete the workload secret associated with a pod if it exists."""
kube_client = get_kube_client()
secret_name = f"airflow-workload-{pod_name}"
try:
kube_client.delete_namespaced_secret(name=secret_name, namespace=namespace)
print(f'Deleted workload secret "{secret_name}" from "{namespace}" namespace')
except ApiException as e:
if e.status != 404:
print(f'Failed to delete workload secret "{secret_name}": {e}')
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from urllib3.exceptions import ReadTimeoutError

from airflow.providers.cncf.kubernetes.backcompat import get_logical_date_key
from airflow.providers.cncf.kubernetes.exceptions import KubernetesApiPermissionError
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import (
ADOPTED,
ALL_NAMESPACES,
Expand All @@ -44,7 +45,12 @@
annotations_to_key,
create_unique_id,
)
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator, workload_to_command_args
from airflow.providers.cncf.kubernetes.pod_generator import (
WORKLOAD_SECRET_NAME_PREFIX,
PodGenerator,
make_safe_label_value,
workload_to_command_args_json_path,
)
from airflow.providers.common.compat.sdk import AirflowException
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import TaskInstanceState
Expand Down Expand Up @@ -553,12 +559,50 @@ def run_next(self, next_job: KubernetesJob) -> None:
pod_template_file = next_job.pod_template_file

dag_id, task_id, run_id, try_number, map_index = key

pod_id = create_unique_id(dag_id, task_id)
secret_name: str | None = None

if len(command) == 1:
from airflow.executors.workloads import ExecuteTask

if isinstance(command[0], ExecuteTask):
workload = command[0]
command = workload_to_command_args(workload)
secret_name = f"{WORKLOAD_SECRET_NAME_PREFIX}-{pod_id}"
labels: dict[str, str] = {
"airflow-workload-secret": "true",
"dag_id": make_safe_label_value(workload.ti.dag_id),
"task_id": make_safe_label_value(workload.ti.task_id),
"run_id": make_safe_label_value(workload.ti.run_id),
"try_number": str(workload.ti.try_number),
"ti_id": str(workload.ti.id),
}
if workload.ti.map_index is not None and workload.ti.map_index >= 0:
labels["map_index"] = str(workload.ti.map_index)
try:
self.kube_client.create_namespaced_secret(
namespace=self.namespace,
body=client.V1Secret(
metadata=client.V1ObjectMeta(
name=secret_name,
namespace=self.namespace,
labels=labels,
),
string_data={"workload.json": workload.model_dump_json()},
),
)
except ApiException as e:
if e.status == 403:
raise KubernetesApiPermissionError(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seemed like the right exception to raise 🤷🏽

f"Failed to create workload secret '{secret_name}' in namespace "
f"'{self.namespace}': permission denied (HTTP 403). "
"Ensure the RBAC pod-launcher-role has 'create' and 'patch' verbs for "
"'secrets'. If you recently upgraded the cncf-kubernetes provider, "
"update the pod-launcher-role in your helm charts or grant the missing "
"permissions manually."
) from e
raise
command = workload_to_command_args_json_path()
else:
raise ValueError(
f"KubernetesExecutor doesn't know how to handle workload of type: {type(command[0])}"
Expand All @@ -576,7 +620,7 @@ def run_next(self, next_job: KubernetesJob) -> None:
pod = PodGenerator.construct_pod(
namespace=self.namespace,
scheduler_job_id=self.scheduler_job_id,
pod_id=create_unique_id(dag_id, task_id),
pod_id=pod_id,
dag_id=dag_id,
task_id=task_id,
kube_image=self.kube_config.kube_image,
Expand All @@ -588,7 +632,9 @@ def run_next(self, next_job: KubernetesJob) -> None:
pod_override_object=kube_executor_config,
base_worker_pod=base_worker_pod,
with_mutation_hook=True,
workload_secret_name=secret_name,
)

# Reconcile the pod generated by the Operator and the Pod
# generated by the .cfg file
self.log.info(
Expand All @@ -600,9 +646,58 @@ def run_next(self, next_job: KubernetesJob) -> None:
self.log.debug("Kubernetes running for command %s", command)
self.log.debug("Kubernetes launching image %s", pod.spec.containers[0].image)

# the watcher will monitor pods, so we do not block.
self.run_pod_async(pod, **self.kube_config.kube_client_request_args)
self.log.debug("Kubernetes Job created!")
try:
resp = self.run_pod_async(pod, **self.kube_config.kube_client_request_args)
except Exception:
if secret_name:
try:
self.kube_client.delete_namespaced_secret(secret_name, self.namespace)
except ApiException:
self.log.debug(
"Failed to clean up workload secret %s after pod creation failure; "
"it will be removed by the cleanup CronJob.",
secret_name,
exc_info=True,
)
raise

if secret_name:
try:
self.kube_client.patch_namespaced_secret(
name=secret_name,
namespace=self.namespace,
body={
"metadata": {
"ownerReferences": [
{
"apiVersion": "v1",
"kind": "Pod",
"name": resp.metadata.name,
"uid": resp.metadata.uid,
# Pod should not wait on secret to be deleted
"blockOwnerDeletion": False,
}
]
}
},
)
except ApiException as e:
if e.status == 403:
self.log.warning(
"Could not set ownerReference on workload secret %s: permission denied (HTTP 403). "
"Ensure the scheduler's RBAC role grants the 'patch' verb on 'secrets'. "
"If you recently upgraded the cncf-kubernetes provider, update the "
"pod-launcher-role in your Helm chart. "
"The cleanup CronJob will delete the secret as a fallback.",
secret_name,
)
else:
self.log.warning(
"Could not set ownerReference on workload secret %s; "
"as a fallback the cleanup CronJob will delete it.",
secret_name,
exc_info=True,
)

def delete_pod(self, pod_name: str, namespace: str) -> None:
"""Delete Pod from a namespace; does not raise if it does not exist."""
Expand All @@ -616,7 +711,7 @@ def delete_pod(self, pod_name: str, namespace: str) -> None:
)
except ApiException as e:
# If the pod is already deleted
if str(e.status) != "404":
if e.status != 404:
raise

def patch_pod_revoked(self, *, pod_name: str, namespace: str):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,7 @@ async def delete_pod(self, name: str, namespace: str):
)
except async_client.ApiException as e:
# If the pod is already deleted
if str(e.status) != "404":
if e.status != 404:
raise

@generic_api_retry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,5 +364,5 @@ def delete_spark_job(self, spark_job_name=None):
)
except ApiException as e:
# If the pod is already deleted
if str(e.status) != "404":
if e.status != 404:
raise
Loading