Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions airflow-core/src/airflow/dag_processing/bundles/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ class BaseDagBundle(ABC):
:param refresh_interval: How often the bundle should be refreshed from the source in seconds
(Optional - defaults to [dag_processor] refresh_interval)
:param version: Version of the DAG bundle (Optional)
:param version_data: Structured metadata for this bundle version, e.g. an S3 manifest.
Only populated for pinned runs (where dag_run.bundle_version is not None). (Optional)
"""

supports_versioning: bool = False
Expand All @@ -304,10 +306,12 @@ def __init__(
name: str,
refresh_interval: int = conf.getint("dag_processor", "refresh_interval"),
version: str | None = None,
version_data: dict[str, Any] | None = None,
Comment thread
o-nikolas marked this conversation as resolved.
view_url_template: str | None = None,
) -> None:
self.name = name
self.version = version
self.version_data = version_data
self.refresh_interval = refresh_interval
self.is_initialized: bool = False

Expand Down
9 changes: 7 additions & 2 deletions airflow-core/src/airflow/dag_processing/bundles/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,19 +324,24 @@ def _extract_template_params(bundle_instance: BaseDagBundle) -> dict:

return params

def get_bundle(self, name: str, version: str | None = None) -> BaseDagBundle:
def get_bundle(
self, name: str, version: str | None = None, version_data: dict | None = None
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: type drift across the three layers. workloads/base.py:69 uses dict[str, Any] | None and the BundleVersion dataclass at bundles/base.py:309 also uses dict[str, Any] | None, but here it's a bare dict | None. Worth tightening to dict[str, Any] | None so the signatures agree and mypy callers don't have to widen on this hop.

) -> BaseDagBundle:
"""
Get a DAG bundle by name.

:param name: The name of the DAG bundle.
:param version: The version of the DAG bundle you need (optional). If not provided, ``tracking_ref`` will be used instead.
:param version_data: Optional structured data associated with this version (e.g., S3 manifest).

:return: The DAG bundle.
"""
cfg_bundle = self._bundle_config.get(name)
if not cfg_bundle:
raise ValueError(f"Requested bundle '{name}' is not configured.")
return cfg_bundle.bundle_class(name=name, version=version, **cfg_bundle.kwargs)
return cfg_bundle.bundle_class(
name=name, version=version, version_data=version_data, **cfg_bundle.kwargs
)

def get_all_dag_bundles(self) -> Iterable[BaseDagBundle]:
"""
Expand Down
3 changes: 2 additions & 1 deletion airflow-core/src/airflow/executors/workloads/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import os
from abc import ABC, abstractmethod
from collections.abc import Hashable
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

from pydantic import BaseModel, ConfigDict, Field

Expand Down Expand Up @@ -66,6 +66,7 @@ class BundleInfo(BaseModel):

name: str
version: str | None = None
version_data: dict[str, Any] | None = None
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

version_data is serialized end-to-end on every workload payload (executor channel, JWT, possibly SQS/Kafka/K8s pod env). The first bundle implementation that populates this with a per-DAG-file manifest could blow through SQS's 256 KB message limit or the ~1 MB K8s pod-env cap on large multi-tenant deployments.

Worth one of: a docstring with a soft size cap, a Pydantic validator that warns above a threshold, or a documented side-channel pattern (S3 URL pointer + worker fetch) before any consumer ships a manifest-style payload here.



class BaseWorkloadSchema(BaseModel):
Expand Down
4 changes: 4 additions & 0 deletions airflow-core/src/airflow/executors/workloads/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,13 @@ def make(

ser_ti = TaskInstanceDTO.model_validate(ti, from_attributes=True)
if not bundle_info:
version_data = None
if ti.dag_version is not None and ti.dag_run.bundle_version is not None:
version_data = ti.dag_version.version_data
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why of this read off ti, but other things just below are ti.dag_model.bundle*

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth flagging that even off the same ti, version and version_data can disagree. Two cases:

  1. Unpinned runs (disable_bundle_versioning=True): dag_run.bundle_version is None, but ti.dag_version.version_data may still carry a manifest -- so version=None, version_data={...}.
  2. After _verify_integrity_if_dag_changed (scheduler_job_runner.py:2521-2530): TI's dag_version_id is bumped to the latest version while dag_run.bundle_version is left untouched, so version_data describes a newer version than version reports.

The scheduler picks a deliberate rule for bundle_version at scheduler_job_runner.py:1438-1442; worth deciding the equivalent rule for version_data here (e.g., is it valid to expose version_data when version is None?).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashb - What we're passing along here is the version that is being used and the bundle to use to grab it. The bundle itself isn't tracking the current version of the dag, that's on the dag_version. So once on the worker side, we'll ask the bundle to load the state of the bundle for the specific version the dag is trying to run with. So properties from these two things are combined later. Does that make sense?

@kaxil Fair point, I think the right rule is: version_data should only be populated when the run is pinned (i.e., dag_run.bundle_version is not None). If the run is unpinned, the worker should use the latest bundle state anyway, so sending stale version_data would be misleading. I'll add a guard:

version_data = None
if ti.dag_version is not None and ti.dag_run.bundle_version is not None:
    version_data = ti.dag_version.version_data 

This mirrors the existing rule for bundle_version at scheduler_job_runner.py:1438-1442.

For the second case the version_data will still come from the ti's dag_version (which may be newer than the run's), but since the run is pinned, the bundle will use version_data to check out the right code regardless. If we want stricter alignment (version_data always matches bundle_version), we'd need to look up the DagVersion by bundle_version string rather than using ti.dag_version but that adds another query and I think the current ti-based approach is correct for the worker's needs. Thoughts?

bundle_info = BundleInfo(
name=ti.dag_model.bundle_name,
version=ti.dag_run.bundle_version,
version_data=version_data,
)
fname = log_filename_template_renderer()(ti=ti)

Expand Down
1 change: 1 addition & 0 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) -
ranked_query.c.map_index_for_ordering,
)
.options(selectinload(TI.dag_model))
.options(selectinload(TI.dag_version))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, joins aren't free, and this isn't used for most places.

I'm wondering if this needs to be based on what the bundle backend needs somehow?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

selectinload uses a single additional SELECT batched across all tis in the result set (not n sized query), so the cost is one extra round-trip per scheduler loop iteration rather than per ti. It's hard to make it conditional because we're working with a bunch of tis that are mixed between versioned bundles and not. We could do a lazy load later, but then it becomes many requests, one for each TI lazy loaded. I think the impact here is pretty minimal given the single extra batched select (no new joins are happening here now)?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This selectinload matters because TIs go through make_transient() before ExecuteTask.make(ti) reads ti.dag_version.version_data. Lazy loads on transient objects return None silently rather than raising DetachedInstanceError, so if a future "trim queries" pass drops this line, version_data flows to workers as None with no test or log failure. Worth either an inline comment pinning the dependency, or a round-trip test in tests/unit/jobs/test_scheduler_job.py that asserts version_data survives _enqueue_task_instances_with_queued_state.

Also flagging: this adds a second selectinload inside the FOR UPDATE SKIP LOCKED critical section, one extra round-trip while row locks are held. Probably fine for now, but worth keeping an eye on if scheduler contention regresses.

)

query = query.limit(max_tis)
Expand Down
13 changes: 13 additions & 0 deletions airflow-core/tests/unit/dag_processing/bundles/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,3 +323,16 @@ def test_bundle_version_inequality(self):
bv1 = BundleVersion(version="abc", data={"key": "val"})
bv2 = BundleVersion(version="abc", data={"key": "other"})
assert bv1 != bv2


def test_version_data_stored_on_bundle():
"""Test that version_data passed to a bundle constructor is stored on the instance."""
manifest = {"schema_version": 1, "files": {"dags/my_dag.py": "S3VersionId123"}}
bundle = BasicBundle(name="test", version="abc", version_data=manifest)
assert bundle.version_data == manifest


def test_version_data_defaults_to_none():
"""Test that version_data defaults to None when not provided."""
bundle = BasicBundle(name="test")
assert bundle.version_data is None
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two tests verify the constructor stores version_data, but nothing exercises the actual threading from TI to BundleInfo: ExecuteTask.make(ti) populating BundleInfo.version_data from ti.dag_version.version_data under both pinned and unpinned runs.

Worth a test that builds a TI with dag_version.version_data={"x": 1} + dag_run.bundle_version="v1", calls ExecuteTask.make(ti), and asserts BundleInfo.version_data == {"x": 1}; same setup with bundle_version=None should produce version_data is None. That's what pins the case-2 (pinned-run) behavior the guard at workloads/task.py:122 now relies on.

Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,7 @@ def test_try_adopt_task_instances(self, mock_executor):
task.dag_model = mock.Mock()
task.dag_model.bundle_name = "test_bundle"
task.dag_model.relative_fileloc = "test_dag.py"
task.dag_version = mock.Mock(version_data=None)
task.dag_run = mock.Mock()
task.dag_run.bundle_version = "1.0.0"
task.dag_run.context_carrier = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,12 @@ components:
- type: string
- type: 'null'
title: Version
version_data:
anyOf:
- additionalProperties: true
type: object
- type: 'null'
title: Version Data
type: object
required:
- name
Expand Down
1 change: 1 addition & 0 deletions task-sdk/src/airflow/sdk/api/datamodels/_generated.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ class BundleInfo(BaseModel):

name: Annotated[str, Field(title="Name")]
version: Annotated[str | None, Field(title="Version")] = None
version_data: Annotated[dict[str, Any] | None, Field(title="Version Data")] = None


class TerminalTIState(str, Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ def _target():
bundle = DagBundlesManager().get_bundle(
name=bundle_info.name,
version=bundle_info.version,
version_data=bundle_info.version_data,
)
bundle.initialize()
if (bundle_path := str(bundle.path)) not in sys.path:
Expand Down
13 changes: 13 additions & 0 deletions task-sdk/src/airflow/sdk/execution_time/schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -4961,6 +4961,19 @@
],
"default": null,
"title": "Version"
},
"version_data": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"title": "Version Data"
}
},
"required": [
Expand Down
1 change: 1 addition & 0 deletions task-sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,7 @@ def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance:
bundle_instance = DagBundlesManager().get_bundle(
name=bundle_info.name,
version=bundle_info.version,
version_data=bundle_info.version_data,
)
bundle_instance.initialize()
_verify_bundle_access(bundle_instance, log)
Expand Down
6 changes: 3 additions & 3 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading