Skip to content

AIP-76: Propagate partition_date to consumers of partitioned assets#67285

Open
nathadfield wants to merge 1 commit into
apache:mainfrom
king:propagate-partition-date-to-consumers
Open

AIP-76: Propagate partition_date to consumers of partitioned assets#67285
nathadfield wants to merge 1 commit into
apache:mainfrom
king:propagate-partition-date-to-consumers

Conversation

@nathadfield

@nathadfield nathadfield commented May 21, 2026

Copy link
Copy Markdown
Contributor

What this does

Makes partition_date a first-class template variable on the consumer side of AIP-76 partitioned assets, so authors can write:

WHERE dt = "{{ partition_date | ds }}"

instead of slicing strings out of partition_key.

How

  • The consumer DagRun's partition_date is resolved by the scheduler. Temporal and composite mappers derive it from the partition key via PartitionMapper.to_partition_date (the resolver landed in fix(scheduler): populate partition_date for temporal asset partitions #68266). IdentityMapper is the one case the scheduler can't resolve, since its key carries no datetime, so this PR carries the producer DagRun's partition_date onto the AssetPartitionDagRun (threaded in via register_asset_change, not stored on AssetEvent) and passes that carry into the scheduler's resolver, which returns it when no temporal mapper contributes a date.
  • partition_date is populated whenever the consumer's mapper can resolve the key to a datetime: directly for the StartOf*Mapper family, by delegation for RollupMapper / ChainMapper / FanOutMapper whose effective child is temporal, and by carry-through for IdentityMapper. Mappers whose key carries no temporal meaning (ProductMapper, AllowedKeyMapper, custom) leave it None, and those consumers fall back to partition_key.
  • partition_date is output-only on trigger payloads, so a manual trigger cannot create an inconsistent (partition_key, partition_date) pair.
  • Surfaced through to Context["partition_date"] (coerced to tz-aware), the execution-API DagRun (Cadwyn-versioned to strip the field for older Task SDK clients), the core-API DAGRunResponse, OTel span attributes, and the standard provider's PythonVirtualenvOperator / ExternalPythonOperator serializable-context allow-list.

Relationship to #68266

#68266 added the polymorphic PartitionMapper.to_partition_date and the scheduler's _resolve_partition_date, which now own temporal and composite resolution. This PR builds on that and adds the parts it doesn't cover: the IdentityMapper source-date carry and the exposure layer above. _resolve_partition_date gained a carried_partition_date parameter (the APDR's carried date) and keeps a plain datetime | None return: the carry is returned only when no temporal mapper contributes, and is never substituted for a date the resolver deliberately suppressed (conflicting temporal mappers, or a mapper that raised).

An earlier revision of this PR also stored the source partition_date on AssetEvent; that column was dropped, since the consumer DagRun is self-contained once created and the source date is threaded at APDR creation instead.

Known limitations / follow-ups

  • For mappers whose key carries no temporal meaning (ProductMapper, AllowedKeyMapper, custom non-temporal mappers), partition_date is None and consumers fall back to partition_key.
  • If two events resolve the same target_partition_key to different partition_date values (two identity-mapped upstreams carrying different producer dates, or temporal mappers configured with different timezones), the carried date is suppressed to None rather than picked by arrival order, so the consumer DagRun is not stamped with an unstable value.

closes: #67239


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Generated-by: Claude Code (Opus 4.8)

@boring-cyborg boring-cyborg Bot added area:airflow-ctl area:API Airflow's REST/HTTP API area:db-migrations PRs with DB migration area:providers area:Scheduler including HA (high availability) scheduler area:task-sdk area:UI Related to UI/UX. For Frontend Developers. backport-to-airflow-ctl/v0-1-test kind:documentation provider:standard labels May 21, 2026
@nathadfield nathadfield force-pushed the propagate-partition-date-to-consumers branch 5 times, most recently from 27b7c87 to bb67204 Compare May 22, 2026 09:35
@nathadfield nathadfield marked this pull request as ready for review May 22, 2026 12:51
@nathadfield nathadfield force-pushed the propagate-partition-date-to-consumers branch 2 times, most recently from 7bb59e8 to 47fcfab Compare June 2, 2026 11:28
@kaxil

kaxil commented Jun 2, 2026

Copy link
Copy Markdown
Member

cc @Lee-W

@nathadfield nathadfield force-pushed the propagate-partition-date-to-consumers branch 2 times, most recently from 244b329 to cb36160 Compare June 8, 2026 11:45
@nathadfield

Copy link
Copy Markdown
Contributor Author

@kaxil @Lee-W If one of you is able to review this I'd be really grateful. Cheers!

Comment thread uv.lock
Comment thread airflow-core/docs/migrations-ref.rst Outdated
@Lee-W

Lee-W commented Jun 8, 2026

Copy link
Copy Markdown
Member

so {{ partition_date }} will be partition_date for a DagRun not for AssetEvent. it might differ if we use a RollupMapper (since there'll be n different partition key arrives and we only have one output key)

@nathadfield nathadfield force-pushed the propagate-partition-date-to-consumers branch 2 times, most recently from 4338ce1 to 28c6aa9 Compare June 8, 2026 13:46
@nathadfield

Copy link
Copy Markdown
Contributor Author

so {{ partition_date }} will be partition_date for a DagRun not for AssetEvent. it might differ if we use a RollupMapper

Good point, @Lee-W. {{ partition_date }} is the consumer DagRun's date, and a RollupMapper collapses n upstream partitions (each with its own date) into one downstream partition, so no single source date describes the window. I've made _compute_target_partition_date return None for RollupMapper (explicit is_rollup guard plus a unit test), so rollup consumers get partition_date=None rather than an arbitrary first-event value and fall back to partition_key. Documented alongside the other mappers that leave it None.

@nathadfield nathadfield force-pushed the propagate-partition-date-to-consumers branch from 28c6aa9 to 27444f6 Compare June 9, 2026 08:20
@nathadfield nathadfield requested a review from Lee-W June 9, 2026 10:16
Comment on lines +633 to +634
``partition_key``. ``RollupMapper`` also leaves ``partition_date``
``None`` — a rollup collapses many upstream partitions, each with its

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think RollupMapper should have partition_date in some cases. But I have another open PR handling it

Comment thread airflow-core/src/airflow/assets/manager.py Outdated
from airflow.partition_mappers.identity import IdentityMapper
from airflow.partition_mappers.temporal import _BaseTemporalMapper

if is_rollup(mapper):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm now working on another PR to avoid this type switch. #68266 adds a polymorphic PartitionMapper.to_partition_date(key) where composites delegate (RollupMapper → upstream_mapper, FanOut, Chain) and temporal mappers return the anchor. Adopting it here would support rollup/fan-out/chain, and remove the isinstance/is_rollup branching.

Keep from this PR (not in #68266)

@nathadfield nathadfield force-pushed the propagate-partition-date-to-consumers branch from 27444f6 to f4808a4 Compare June 10, 2026 10:46
@nathadfield

Copy link
Copy Markdown
Contributor Author

Thanks @Lee-W , that's helpful context.

Agreed on the type switch: I'll leave the is_rollup/isinstance branching in _compute_target_partition_date as-is here and let your polymorphic to_partition_date replace it once that lands, rather than couple this PR to an unmerged API.

From your list I'll keep on this side: the IdentityMapper passthrough (its key can't reconstruct a date, so the threaded source date is still needed), Context["partition_date"], the execution-API field plus Cadwyn versioning, DAGRunResponse, and the docs. That leaves your PR to own the scheduler-side DagRun.partition_date stamping for rollup, fan-out, and chain.

Same on RollupMapper: I've documented it as None here for now, and #68266 can relax that for the cases where a rolled-up window has a well-defined date.

@nathadfield nathadfield force-pushed the propagate-partition-date-to-consumers branch from f4808a4 to 1c1d9c9 Compare June 10, 2026 13:40
@Lee-W

Lee-W commented Jun 10, 2026

Copy link
Copy Markdown
Member

Thanks for bearing with all the sudden change 🙏 #68266 was just merged. i should be able to take a look again tomorrow. Thanks!

@nathadfield nathadfield force-pushed the propagate-partition-date-to-consumers branch 4 times, most recently from 4f63855 to 0c5aade Compare June 11, 2026 07:30
@nathadfield

Copy link
Copy Markdown
Contributor Author

@Lee-W rebased this onto main now that #68266 is in, and reworked it so it builds on your resolver instead of re-implementing the same thing. Quick rundown for when you get a chance to look again.

The scheduler now leans entirely on your _resolve_partition_date / to_partition_date for temporal and composite mappers. I pulled out our old _compute_target_partition_date temporal and rollup branching and put temporal.py back to your version, so the to_downstream_normalized refactor is gone.

What's left on our side is just the bit #68266 can't do: the IdentityMapper case. Its key can't be turned back into a date, so we carry the producer's date onto the APDR and the scheduler falls back to that when the resolver doesn't return one. Plus the exposure layer that was always the point of this PR: Context["partition_date"], the execution-API DagRun field (Cadwyn-versioned), DAGRunResponse, OTel, and the docs.

One thing I changed in your code that I want to call out: _resolve_partition_date now returns (anchor, suppressed) instead of just datetime | None. I needed to tell apart "no temporal mapper, so fall back to the identity carry" from "temporal mappers disagreed or one raised, so leave it unset." Without that distinction, an identity carry on the same APDR could quietly paper over a conflict you'd just logged. So suppressed=True on the conflict/error paths, and (None, False) when nothing temporal contributed. test_resolve_partition_date is updated to match. Very open to a different shape if you'd rather signal that another way.

A couple of other things while I was in here. _get_or_create_apdr now drops the carried date to None when two events land on the same key with different dates, rather than letting arrival order pick the winner. And I fixed the docs that wrongly claimed RollupMapper and ChainMapper always leave partition_date as None, since they actually delegate through to_partition_date now (your earlier point about rollup).

Last one is more of a question. register_asset_change picked up a partition_date kwarg, next to the existing partition_key. A custom asset_manager_class on the old signature without **kwargs would hit a TypeError. I left it as normal signature evolution rather than bolting on a shim for a single param, but shout if you'd want a compat note in the changelog.

Thanks again for the steer on all this.

@Lee-W

Lee-W commented Jun 11, 2026

Copy link
Copy Markdown
Member

The problem _resolve_partition_date change is trying to solve sounds good, but I'll take a deeper look and think about whether there's a better design 🤔 tuple [..., ...] is not really a straightforward thing.

register_asset_change part sounds like something we can improve. let me take a look and see what we can do

@nathadfield

Copy link
Copy Markdown
Contributor Author

@Lee-W one idea: rather than returning a tuple, we could pass the carried date into the resolver and keep the return type a plain datetime | None, e.g. _resolve_partition_date(..., carried_partition_date=apdr.partition_date). Then the three exits are: anchors agree, return the anchor; nothing temporal contributed, return the carry; conflict or error, return None. The fallback branch at the call site disappears and the suppression rule lives in one place. Happy to push that if it works for you.

@Lee-W

Lee-W commented Jun 11, 2026

Copy link
Copy Markdown
Member

Yep, sounds like a good idea!

Consumers of partitioned assets receive partition_key (str) but
partition_date (datetime) is None on the consumer DagRun, so templates
have to parse the key string. Propagate the datetime form alongside
the string so consumers can use the canonical filter idiom
`{{ partition_date | ds }}` and friends.

The consumer's partition_date is computed alongside its target
partition_key at APDR creation (in assets/manager.py:_queue_partitioned_dags),
threaded in from the producer DagRun's partition_date via register_asset_change
rather than stored on AssetEvent, and persisted on AssetPartitionDagRun. The
scheduler copies apdr.partition_date into the consumer DagRun, so the date
stays consistent with the mapper that produced the key. IdentityMapper passes
the source date through; the StartOf*Mapper family normalizes via
to_downstream_normalized; other mappers leave partition_date None and consumers
fall back to partition_key.

closes: apache#67239
@nathadfield nathadfield force-pushed the propagate-partition-date-to-consumers branch from 0c5aade to 26ef605 Compare June 11, 2026 09:08
@nathadfield

Copy link
Copy Markdown
Contributor Author

Yep, sounds like a good idea!

Great! I've just added it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:airflow-ctl area:API Airflow's REST/HTTP API area:db-migrations PRs with DB migration area:providers area:Scheduler including HA (high availability) scheduler area:task-sdk area:UI Related to UI/UX. For Frontend Developers. backport-to-airflow-ctl/v0-1-test kind:documentation provider:standard ready for maintainer review Set after triaging when all criteria pass.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

AIP-76: propagate partition_date to consumers

5 participants