Fix missing task.queued_duration metric in Airflow 3#67592
Conversation
The metric was only emitted by TaskInstance.emit_state_change_metric, which was only called from the legacy LocalTaskJob path (_check_and_change_state_before_execution). Airflow 3 workers run via the Task SDK and supervisor, which flip TI state to RUNNING through the ti_run Execution API endpoint instead — that path bypassed the emit site, so task.queued_duration (and its registry-derived legacy name dag.<dag_id>.<task_id>.queued_duration) stopped firing entirely. Emit the metric from the ti_run endpoint at the same moment it flips state from QUEUED to RUNNING. Skip the emit when end_date is already set (deferral resume) or queued_dttm is missing, mirroring the existing guards in emit_state_change_metric.
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide
|
| # this is a deferral resume or similar, and the timing would be misleading). | ||
| # The registry-based legacy name dag.<dag_id>.<task_id>.queued_duration is | ||
| # emitted automatically by stats.timing via metrics_template.yaml. | ||
| if ti.queued_dttm is not None and ti.end_date is None: |
There was a problem hiding this comment.
Could we include ti.next_method is None in the guard to avoid emitting this metric again for deferred task resumes? In the Execution API path, deferred tasks are marked by next_method / next_kwargs, and end_date may still be None, so end_date is None alone does not reliably identify the first run. wdyt?
There was a problem hiding this comment.
Good catch, thanks! You're right that end_date stays None across a deferral, so I've added next_method is None to the guard and a deferral_resume test case. Done in 1da293d
A deferred task resuming through the ti_run endpoint passes QUEUED -> RUNNING again, but deferral does not set end_date, so the previous end_date-only guard re-emitted task.queued_duration on every resume. Also gate on next_method (how this endpoint already detects deferral resumes) so the metric fires only on the genuine first run of a try.
Summary
task.queued_duration(and its registry-derived legacy namedag.<dag_id>.<task_id>.queued_duration) stopped firing entirely after the Airflow 3 worker switched to the Task SDK / supervisor / Execution API.The metric was only emitted by
TaskInstance.emit_state_change_metric, which is only reachable from_check_and_change_state_before_execution— the legacy LocalTaskJob path. Airflow 3 workers flip TI state toRUNNINGthrough theti_runExecution API endpoint instead, which bypasses the emit site.This is the same regression pattern as #62019 (missing
ti.start/ti.finish).Fix
Emit
task.queued_durationfromti_runat the moment it transitions the TI from QUEUED to RUNNING. The skip guards (end_date is None,queued_dttm is not None) mirror the existing logic inemit_state_change_metricso that deferral resumes don't get a misleading second reading. The legacy dotted name is emitted automatically bystats.timingvia themetrics_template.yamlregistry — no manual second call needed.Test plan
test_ti_run_emits_queued_duration_metricconfirmed to fail before the fix and pass after (verified by stashing the production change and re-running the test).test_ti_run_skips_queued_duration_metriccovers both skip conditions (end_dateset /queued_dttmmissing).TestTIRunStatetests still pass.ruff format/ruff check/mypy-airflow-core/prek run --from-ref upstream/main --stage pre-commitall green.closes: #63503
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code (Opus 4.7) following the guidelines