Add run_after to TriggerDagRunOperator#62259
Add run_after to TriggerDagRunOperator#62259arnoldmr01 wants to merge 12 commits intoapache:mainfrom
Conversation
a33dfeb to
8e7c84d
Compare
3d40250 to
fedde25
Compare
jason810496
left a comment
There was a problem hiding this comment.
Thanks for the PR! LGTM overall.
providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
Outdated
Show resolved
Hide resolved
providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
Outdated
Show resolved
Hide resolved
providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
Outdated
Show resolved
Hide resolved
providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
Outdated
Show resolved
Hide resolved
|
@arnoldmr01 This PR has been converted to draft because it does not yet meet our Pull Request quality criteria. Issues found:
What to do next:
Converting a PR to draft is not a rejection — it is an invitation to bring the PR up to the project's standards so that maintainer review time is spent productively. If you have questions, feel free to ask on the Airflow Slack. |
4a4a78f to
94c8910
Compare
0cd218e to
d598e1e
Compare
d598e1e to
0dbf468
Compare
b2c6bdb to
2b4d597
Compare
jason810496
left a comment
There was a problem hiding this comment.
LGTM and it will be good to go after resolving conflict of uv.lock. Thanks!
2b4d597 to
6fb01fb
Compare
jason810496
left a comment
There was a problem hiding this comment.
Mark as request changes in case of further regression on main CI.
| requires-dist = [ | ||
| { name = "aiobotocore", marker = "extra == 'aiobotocore'", specifier = ">=3.0.0" }, | ||
| { name = "aiobotocore", extras = ["boto3"], marker = "extra == 'aiobotocore'", specifier = ">=2.26.0" }, | ||
| { name = "apache-airflow", editable = "." }, |
There was a problem hiding this comment.
Even though the CI passed, I'm still concerned about the uv.lock change. Would it be possible to revert the uv.lock change before we merge?
There was a problem hiding this comment.
Pull request overview
Adds run_after support to TriggerDagRunOperator and wires it through the Task SDK client/message flow and the Execution API request schema so operators can trigger parallel DagRuns without relying on logical_date.
Changes:
- Add
run_afterparameter handling toTriggerDagRunOperator(including Airflow 3 exception payload wiring). - Extend Task SDK TriggerDagRun message, API client payload, and supervisor dispatch to include
run_after. - Extend Execution API
TriggerDAGRunPayload(and versioning migrations) to acceptrun_after, and pass it intotrigger_dag.
Reviewed changes
Copilot reviewed 13 out of 14 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py |
Add run_after parameter + parsing/dispatch; adjust AF2 warning behavior. |
providers/standard/src/airflow/providers/standard/version_compat.py |
Add NOTSET import + is_arg_set compatibility helper. |
providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py |
Add unit test coverage for run_after; adjust AF2 warning assertion. |
task-sdk/src/airflow/sdk/api/client.py |
Add run_after to dag_runs.trigger() and payload. |
task-sdk/src/airflow/sdk/api/datamodels/_generated.py |
Add run_after field to generated TriggerDAGRunPayload. |
task-sdk/src/airflow/sdk/exceptions.py |
Extend DagRunTriggerException to carry run_after. |
task-sdk/src/airflow/sdk/execution_time/supervisor.py |
Pass run_after through supervisor -> client trigger call. |
task-sdk/src/airflow/sdk/execution_time/task_runner.py |
Include run_after when translating trigger exception into supervisor message. |
task-sdk/tests/task_sdk/execution_time/test_supervisor.py |
Update request-case expectations for new trigger() signature and add run_after case. |
airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py |
Add run_after field to Execution API request model. |
airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py |
Forward payload.run_after into trigger_dag(). |
airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py |
Add Cadwyn version change entry for run_after field (plus migration instructions). |
airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py |
Register the new version change. |
uv.lock |
Updates aiobotocore requirement metadata in the lockfile. |
Comments suppressed due to low confidence (1)
task-sdk/tests/task_sdk/execution_time/test_supervisor.py:2159
REQUEST_TEST_CASESnow contains two entries with the sametest_id(bothdag_run_trigger). Since these IDs are used by pytest parametrization (ids=lambda tc: tc.test_id), please make them unique (e.g. differentiate the “with note” vs “without note” case) to keep test output/debugging unambiguous.
response=OKResponse(ok=True),
),
test_id="dag_run_trigger",
),
RequestTestCase(
message=TriggerDagRun(
dag_id="test_dag",
run_id="test_run",
conf={"key": "value"},
logical_date=timezone.datetime(2025, 1, 1),
reset_dag_run=True,
),
expected_body={"ok": True, "type": "OKResponse"},
client_mock=ClientMock(
method_path="dag_runs.trigger",
args=("test_dag", "test_run", {"key": "value"}, timezone.datetime(2025, 1, 1), None, True, None),
response=OKResponse(ok=True),
),
test_id="dag_run_trigger",
),
| unsupported_parameters = [ | ||
| attr | ||
| for attr in self.attributes_not_suppported_in_airflow_2 | ||
| if getattr(self, attr, NOTSET) is not NOTSET or None |
There was a problem hiding this comment.
Modified attributes_not_supported_in_airflow_2 as a set. e.g.
attributes_not_supported_in_airflow_2 = { "run_after": NOTSET, "note": None, }
The keys the not supported attributes and the values are the default values.
The logic to check if those not supported attributes are set by the user is
unsupported_parameters = []
for attr, default_value in self.attributes_not_suppported_in_airflow_2.items():
value = getattr(self, attr, default_value)
if value is not default_value:
unsupported_parameters.append(attr)
| attributes_not_suppported_in_airflow_2: Sequence[str] = ( | ||
| "run_after", | ||
| "note", | ||
| ) |
There was a problem hiding this comment.
Modified attributes_not_supported_in_airflow_2 to
attributes_not_supported_in_airflow_2 = { "run_after": NOTSET, "note": None, }
and Modified the typo
| class AddRunAfterFiled(VersionChange): | ||
| """Add run_after parameter to TriggerDAGRunPayload Model.""" | ||
|
|
||
| description = __doc__ | ||
|
|
||
| instructions_to_migrate_to_previous_version = ( | ||
| schema(TriggerDAGRunPayload).field("run_after").didnt_exist, | ||
| ) |
There was a problem hiding this comment.
Changed AddRunAfterFiled to AddRunAfterField
| def trigger( | ||
| self, | ||
| dag_id: str, | ||
| run_id: str, | ||
| conf: dict | None = None, | ||
| logical_date: datetime | None = None, | ||
| run_after: datetime | None = None, | ||
| reset_dag_run: bool = False, | ||
| note: str | None = None, | ||
| ) -> OKResponse | ErrorResponse: | ||
| """Trigger a Dag run via the API server.""" | ||
| body = TriggerDAGRunPayload( | ||
| logical_date=logical_date, conf=conf or {}, reset_dag_run=reset_dag_run, note=note | ||
| logical_date=logical_date, | ||
| conf=conf or {}, | ||
| reset_dag_run=reset_dag_run, | ||
| note=note, | ||
| run_after=run_after, | ||
| ) |
6fb01fb to
0879094
Compare
0879094 to
e735774
Compare
feat: add support to TriggerDagRunOperator
e735774 to
7778a94
Compare
bd6a17f to
2d9b492
Compare
Why
To let TriggerDagRunOperator support run_after parameter in order to trigger multiple DagRuns of the same DAG in parallel without being constrained by logical_date.
What does the PR change
Was generative AI tooling used to co-author this PR?
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.