From c20149632ce492539a6d542fb24a8edb705dcb83 Mon Sep 17 00:00:00 2001 From: Arnold Lin Date: Sat, 21 Feb 2026 09:32:15 -0500 Subject: [PATCH 01/12] feat: add run_after to DagRunTriggerException --- task-sdk/src/airflow/sdk/exceptions.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/task-sdk/src/airflow/sdk/exceptions.py b/task-sdk/src/airflow/sdk/exceptions.py index b69abe6226575..1f581fe225784 100644 --- a/task-sdk/src/airflow/sdk/exceptions.py +++ b/task-sdk/src/airflow/sdk/exceptions.py @@ -241,6 +241,7 @@ def __init__( dag_run_id: str, conf: dict | None, logical_date=None, + run_after=None, reset_dag_run: bool, skip_when_already_exists: bool, wait_for_completion: bool, @@ -255,6 +256,7 @@ def __init__( self.dag_run_id = dag_run_id self.conf = conf self.logical_date = logical_date + self.run_after = run_after self.reset_dag_run = reset_dag_run self.skip_when_already_exists = skip_when_already_exists self.wait_for_completion = wait_for_completion From b257c60d9147686a11e1f17aa8009d528a81e3f4 Mon Sep 17 00:00:00 2001 From: Arnold Lin Date: Sat, 21 Feb 2026 09:32:48 -0500 Subject: [PATCH 02/12] feat: add support to TriggerDagRunOperator feat: add support to TriggerDagRunOperator --- .../standard/operators/trigger_dagrun.py | 60 ++++++++++++++++--- .../standard/operators/test_trigger_dagrun.py | 38 ++++++++++++ 2 files changed, 89 insertions(+), 9 deletions(-) diff --git a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py index 26b990f9c9e3e..b6f0431095aa7 100644 --- a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py +++ b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py @@ -127,6 +127,7 @@ class TriggerDagRunOperator(BaseOperator): If not provided, a run ID will be automatically generated. :param conf: Configuration for the DAG run (templated). :param logical_date: Logical date for the triggered DAG (templated). + :param run_after: The date before which the triggered DAG should not run. :param reset_dag_run: Whether clear existing DAG run if already exists. This is useful when backfill or rerun an existing DAG run. This only resets (not recreates) the DAG run. @@ -162,6 +163,11 @@ class TriggerDagRunOperator(BaseOperator): "wait_for_completion", "skip_when_already_exists", ) + + attributes_not_supported_in_airflow_2 = { + "run_after": NOTSET, + "note": None, + } template_fields_renderers = {"conf": "py"} ui_color = "#ffefeb" operator_extra_links = [TriggerDagRunLink()] @@ -173,6 +179,7 @@ def __init__( trigger_run_id: str | None = None, conf: dict | None = None, logical_date: str | datetime.datetime | None | ArgNotSet = NOTSET, + run_after: str | datetime.datetime | None | ArgNotSet = NOTSET, reset_dag_run: bool = False, wait_for_completion: bool = False, poke_interval: int = 60, @@ -215,18 +222,38 @@ def __init__( f"Expected str, datetime.datetime, or None for parameter 'logical_date'. Got {type(logical_date).__name__}" ) + self.run_after = run_after + if run_after is NOTSET: + self.run_after = NOTSET + elif run_after is None or isinstance(run_after, (str, datetime.datetime)): + self.run_after = run_after + else: + raise TypeError( + f"Expected str, datetime.datetime, or None for parameter 'run_after'. Got {type(run_after).__name__}" + ) + if fail_when_dag_is_paused and AIRFLOW_V_3_0_PLUS: raise NotImplementedError("Setting `fail_when_dag_is_paused` not yet supported for Airflow 3.x") def execute(self, context: Context): if self.logical_date is NOTSET: - # If no logical_date is provided we will set utcnow() - parsed_logical_date = timezone.utcnow() + if self.run_after is not NOTSET: + parsed_logical_date = None + else: + # If no logical_date is provided we will set utcnow() + parsed_logical_date = timezone.utcnow() elif self.logical_date is None or isinstance(self.logical_date, datetime.datetime): parsed_logical_date = self.logical_date # type: ignore elif isinstance(self.logical_date, str): parsed_logical_date = timezone.parse(self.logical_date) + if self.run_after is NOTSET: + parsed_run_after = parsed_logical_date + elif self.run_after is None or isinstance(self.run_after, datetime.datetime): + parsed_run_after = self.run_after # type: ignore + elif isinstance(self.run_after, str): + parsed_run_after = timezone.parse(self.run_after) + try: if self.conf and isinstance(self.conf, str): self.conf = json.loads(self.conf) @@ -247,7 +274,7 @@ def execute(self, context: Context): run_id = DagRun.generate_run_id( run_type=DagRunType.MANUAL, logical_date=parsed_logical_date, - run_after=parsed_logical_date or timezone.utcnow(), + run_after=parsed_run_after or timezone.utcnow(), ) else: run_id = DagRun.generate_run_id(DagRunType.MANUAL, parsed_logical_date or timezone.utcnow()) # type: ignore[misc,call-arg] @@ -267,14 +294,17 @@ def execute(self, context: Context): if AIRFLOW_V_3_0_PLUS: self._trigger_dag_af_3( - context=context, run_id=self.trigger_run_id, parsed_logical_date=parsed_logical_date + context=context, + run_id=self.trigger_run_id, + parsed_logical_date=parsed_logical_date, + parsed_run_after=parsed_run_after if self.run_after is not NOTSET else None, ) else: self._trigger_dag_af_2( context=context, run_id=self.trigger_run_id, parsed_logical_date=parsed_logical_date ) - def _trigger_dag_af_3(self, context, run_id, parsed_logical_date): + def _trigger_dag_af_3(self, context, run_id, parsed_logical_date, parsed_run_after=None): from airflow.providers.common.compat.sdk import DagRunTriggerException kwargs_accepted = dict( @@ -291,16 +321,28 @@ def _trigger_dag_af_3(self, context, run_id, parsed_logical_date): deferrable=self.deferrable, ) - if self.note and "note" in inspect.signature(DagRunTriggerException.__init__).parameters: + parameters = inspect.signature(DagRunTriggerException.__init__).parameters + if self.note and "note" in parameters: kwargs_accepted["note"] = self.note + if parsed_run_after and "run_after" in parameters: + kwargs_accepted["run_after"] = parsed_run_after + raise DagRunTriggerException(**kwargs_accepted) def _trigger_dag_af_2(self, context, run_id, parsed_logical_date): try: - if self.note: - self.log.warning("Parameter 'note' is not supported in Airflow 2.x and will be ignored.") - + unsupported_parameters = [] + for attr, default_value in self.attributes_not_suppported_in_airflow_2.items(): + value = getattr(self, attr, default_value) + if value is not default_value: + unsupported_parameters.append(attr) + + if unsupported_parameters: + self.log.warning( + "The following parameters are not supported in Airflow 2.x and will be ignored: %s", + ", ".join(unsupported_parameters), + ) dag_run = trigger_dag( dag_id=self.trigger_dag_id, run_id=run_id, diff --git a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py index 29b4132b23557..d5235a468db99 100644 --- a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py +++ b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py @@ -104,6 +104,44 @@ def teardown_method(self): session.execute(delete(DagBundleModel)) session.commit() + @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Implementation is different for Airflow 2 & 3") + def test_trigger_dagrun_with_run_after(self): + """ + Test TriggerDagRunOperator. + + We only verify that the operator runs and raises correct exception. The actual execution logic + after the exception is in Task SDK code. + """ + with time_machine.travel("2025-02-18T08:04:46Z", tick=False): + task = TriggerDagRunOperator( + task_id="test_task", + trigger_dag_id=TRIGGERED_DAG_ID, + conf={"foo": "bar"}, + run_after=timezone.datetime(2025, 2, 19, 12, 0, 0), + ) + + # Ensure correct exception is raised + with pytest.raises(DagRunTriggerException) as exc_info: + task.execute(context={}) + + assert exc_info.value.trigger_dag_id == TRIGGERED_DAG_ID + assert exc_info.value.conf == {"foo": "bar"} + assert exc_info.value.logical_date is None + assert exc_info.value.reset_dag_run is False + assert exc_info.value.skip_when_already_exists is False + assert exc_info.value.wait_for_completion is False + assert exc_info.value.allowed_states == [DagRunState.SUCCESS] + assert exc_info.value.failed_states == [DagRunState.FAILED] + if getattr(exc_info, "note", None) is not None: + assert exc_info.value.note == "Test note" + + expected_run_id = DagRun.generate_run_id( + run_type=DagRunType.MANUAL, run_after=task.run_after + ).rsplit("_", 1)[0] + # rsplit because last few characters are random. + assert exc_info.value.dag_run_id.rsplit("_", 1)[0] == expected_run_id + assert task.trigger_run_id.rsplit("_", 1)[0] == expected_run_id # run_id is saved as attribute + @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Implementation is different for Airflow 2 & 3") def test_trigger_dagrun(self): """ From fd132804b20fd9ce2e9512b1dad73f6808774a19 Mon Sep 17 00:00:00 2001 From: Arnold Lin Date: Sat, 21 Feb 2026 09:34:14 -0500 Subject: [PATCH 03/12] feat: add run_after to TriggerDagRunPayload --- .../src/airflow/api_fastapi/execution_api/datamodels/dagrun.py | 1 + task-sdk/src/airflow/sdk/api/datamodels/_generated.py | 1 + 2 files changed, 2 insertions(+) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py index 8619901f717f7..dd1f1aeb86665 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py @@ -28,6 +28,7 @@ class TriggerDAGRunPayload(StrictBaseModel): """Schema for Trigger DAG Run API request.""" logical_date: UtcDateTime | None = None + run_after: UtcDateTime | None = None conf: dict = Field(default_factory=dict) reset_dag_run: bool = False partition_key: str | None = None diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py index d45f82393067c..6dcd6925602c1 100644 --- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -371,6 +371,7 @@ class TriggerDAGRunPayload(BaseModel): extra="forbid", ) logical_date: Annotated[AwareDatetime | None, Field(title="Logical Date")] = None + run_after: Annotated[AwareDatetime | None, Field(title="Run After")] = None conf: Annotated[dict[str, Any] | None, Field(title="Conf")] = None reset_dag_run: Annotated[bool | None, Field(title="Reset Dag Run")] = False partition_key: Annotated[str | None, Field(title="Partition Key")] = None From 3f1ba4ae1d9657ea1f294bbdf8a243d733b39d74 Mon Sep 17 00:00:00 2001 From: Arnold Lin Date: Sat, 21 Feb 2026 09:36:27 -0500 Subject: [PATCH 04/12] feat: add run_after support to task_runner and supervisor --- .../execution_api/routes/dag_runs.py | 1 + task-sdk/src/airflow/sdk/api/client.py | 7 +++- .../airflow/sdk/execution_time/supervisor.py | 2 +- .../airflow/sdk/execution_time/task_runner.py | 1 + .../execution_time/test_supervisor.py | 41 +++++++++++++++++-- 5 files changed, 46 insertions(+), 6 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py index 448d0b6945abf..d4a0d24024bf9 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py @@ -128,6 +128,7 @@ def trigger_dag_run( run_type=DagRunType.OPERATOR_TRIGGERED, conf=payload.conf, logical_date=payload.logical_date, + run_after=payload.run_after, triggered_by=DagRunTriggeredByType.OPERATOR, replace_microseconds=False, partition_key=payload.partition_key, diff --git a/task-sdk/src/airflow/sdk/api/client.py b/task-sdk/src/airflow/sdk/api/client.py index d0362ce12aa61..a662b9006646d 100644 --- a/task-sdk/src/airflow/sdk/api/client.py +++ b/task-sdk/src/airflow/sdk/api/client.py @@ -711,12 +711,17 @@ def trigger( run_id: str, conf: dict | None = None, logical_date: datetime | None = None, + run_after: datetime | None = None, reset_dag_run: bool = False, note: str | None = None, ) -> OKResponse | ErrorResponse: """Trigger a Dag run via the API server.""" body = TriggerDAGRunPayload( - logical_date=logical_date, conf=conf or {}, reset_dag_run=reset_dag_run, note=note + logical_date=logical_date, + conf=conf or {}, + reset_dag_run=reset_dag_run, + note=note, + run_after=run_after, ) try: diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 8675305dde967..febe58f8f955d 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -1402,7 +1402,7 @@ def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger, req_id: dump_opts = {"exclude_unset": True} elif isinstance(msg, TriggerDagRun): resp = self.client.dag_runs.trigger( - msg.dag_id, msg.run_id, msg.conf, msg.logical_date, msg.reset_dag_run, msg.note + msg.dag_id, msg.run_id, msg.conf, msg.logical_date, msg.run_after, msg.reset_dag_run, msg.note ) elif isinstance(msg, GetDagRun): dr_resp = self.client.dag_runs.get_detail(msg.dag_id, msg.run_id) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 73e8fd0cbf62e..69dd252607a13 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1435,6 +1435,7 @@ def _handle_trigger_dag_run( dag_id=drte.trigger_dag_id, run_id=drte.dag_run_id, logical_date=drte.logical_date, + run_after=drte.run_after, conf=drte.conf, reset_dag_run=drte.reset_dag_run, note=drte.note, diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index e88b5c794cfda..b750c3b53ac4b 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -2097,15 +2097,24 @@ class RequestTestCase: run_id="test_run", conf={"key": "value"}, logical_date=timezone.datetime(2025, 1, 1), + run_after=timezone.datetime(2025, 1, 1, 12, 0, 0), reset_dag_run=True, ), expected_body={"ok": True, "type": "OKResponse"}, client_mock=ClientMock( method_path="dag_runs.trigger", - args=("test_dag", "test_run", {"key": "value"}, timezone.datetime(2025, 1, 1), True, None), + args=( + "test_dag", + "test_run", + {"key": "value"}, + timezone.datetime(2025, 1, 1), + timezone.datetime(2025, 1, 1, 12, 0, 0), + True, + None, + ), response=OKResponse(ok=True), ), - test_id="dag_run_trigger", + test_id="dag_run_trigger_with_run_after", ), RequestTestCase( message=TriggerDagRun( @@ -2119,7 +2128,31 @@ class RequestTestCase: expected_body={"ok": True, "type": "OKResponse"}, client_mock=ClientMock( method_path="dag_runs.trigger", - args=("test_dag", "test_run", {"key": "value"}, timezone.datetime(2025, 1, 1), True, "Test Note"), + args=( + "test_dag", + "test_run", + {"key": "value"}, + timezone.datetime(2025, 1, 1), + None, + True, + "Test Note", + ), + response=OKResponse(ok=True), + ), + test_id="dag_run_trigger", + ), + RequestTestCase( + message=TriggerDagRun( + dag_id="test_dag", + run_id="test_run", + conf={"key": "value"}, + logical_date=timezone.datetime(2025, 1, 1), + reset_dag_run=True, + ), + expected_body={"ok": True, "type": "OKResponse"}, + client_mock=ClientMock( + method_path="dag_runs.trigger", + args=("test_dag", "test_run", {"key": "value"}, timezone.datetime(2025, 1, 1), None, True, None), response=OKResponse(ok=True), ), test_id="dag_run_trigger", @@ -2129,7 +2162,7 @@ class RequestTestCase: expected_body={"error": "DAGRUN_ALREADY_EXISTS", "detail": None, "type": "ErrorResponse"}, client_mock=ClientMock( method_path="dag_runs.trigger", - args=("test_dag", "test_run", None, None, False, None), + args=("test_dag", "test_run", None, None, None, False, None), response=ErrorResponse(error=ErrorType.DAGRUN_ALREADY_EXISTS), ), test_id="dag_run_trigger_already_exists", From 778993d2ec7d1039aec7aafb71ca456b97380ade Mon Sep 17 00:00:00 2001 From: Arnold Lin Date: Thu, 12 Mar 2026 19:13:56 -0400 Subject: [PATCH 05/12] feat: Add _validate_datetime_param --- .../standard/operators/trigger_dagrun.py | 32 ++++++++----------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py index b6f0431095aa7..dd392e4a48472 100644 --- a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py +++ b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py @@ -23,7 +23,7 @@ import time from collections.abc import Sequence from json import JSONDecodeError -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, cast from sqlalchemy import select from sqlalchemy.orm.exc import NoResultFound @@ -212,26 +212,10 @@ def __init__( self.openlineage_inject_parent_info = openlineage_inject_parent_info self.note = note self.deferrable = deferrable + logical_date = _validate_datetime_param("logical_date", logical_date) + run_after = _validate_datetime_param("run_after", run_after) self.logical_date = logical_date - if logical_date is NOTSET: - self.logical_date = NOTSET - elif logical_date is None or isinstance(logical_date, (str, datetime.datetime)): - self.logical_date = logical_date - else: - raise TypeError( - f"Expected str, datetime.datetime, or None for parameter 'logical_date'. Got {type(logical_date).__name__}" - ) - self.run_after = run_after - if run_after is NOTSET: - self.run_after = NOTSET - elif run_after is None or isinstance(run_after, (str, datetime.datetime)): - self.run_after = run_after - else: - raise TypeError( - f"Expected str, datetime.datetime, or None for parameter 'run_after'. Got {type(run_after).__name__}" - ) - if fail_when_dag_is_paused and AIRFLOW_V_3_0_PLUS: raise NotImplementedError("Setting `fail_when_dag_is_paused` not yet supported for Airflow 3.x") @@ -495,3 +479,13 @@ def _trigger_dag_run_af_2_execute_complete( f"{self.trigger_dag_id} return {state} which is not in {self.failed_states}" f" or {self.allowed_states}" ) + + +def _validate_datetime_param(name: str, value): + if value is NOTSET: + return NOTSET + if value is None or isinstance(value, (str, datetime.datetime)): + return value + raise TypeError( + f"Expected str, datetime.datetime, or None for parameter '{name}'. Got {type(value).__name__}" + ) From 59f9404c0b128627090edf8840680c4a7bc5568e Mon Sep 17 00:00:00 2001 From: Arnold Lin Date: Thu, 12 Mar 2026 19:16:06 -0400 Subject: [PATCH 06/12] feat: Add _parse_datetime_param --- .../standard/operators/trigger_dagrun.py | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py index dd392e4a48472..b07a4ef5d6c1a 100644 --- a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py +++ b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py @@ -226,17 +226,15 @@ def execute(self, context: Context): else: # If no logical_date is provided we will set utcnow() parsed_logical_date = timezone.utcnow() - elif self.logical_date is None or isinstance(self.logical_date, datetime.datetime): - parsed_logical_date = self.logical_date # type: ignore - elif isinstance(self.logical_date, str): - parsed_logical_date = timezone.parse(self.logical_date) + else: + logical_date = cast("str | datetime.datetime | None", self.logical_date) + parsed_logical_date = _parse_datetime_param(logical_date) if self.run_after is NOTSET: parsed_run_after = parsed_logical_date - elif self.run_after is None or isinstance(self.run_after, datetime.datetime): - parsed_run_after = self.run_after # type: ignore - elif isinstance(self.run_after, str): - parsed_run_after = timezone.parse(self.run_after) + else: + run_after = cast("str | datetime.datetime | None", self.run_after) + parsed_run_after = _parse_datetime_param(run_after) try: if self.conf and isinstance(self.conf, str): @@ -489,3 +487,10 @@ def _validate_datetime_param(name: str, value): raise TypeError( f"Expected str, datetime.datetime, or None for parameter '{name}'. Got {type(value).__name__}" ) + + +def _parse_datetime_param(value: str | datetime.datetime | None): + if value is None or isinstance(value, datetime.datetime): + return value + if isinstance(value, str): + return timezone.parse(value) From 1eb36d508c127691ed0cb0313b82805b2a503603 Mon Sep 17 00:00:00 2001 From: Arnold Lin Date: Thu, 2 Apr 2026 12:01:09 -0400 Subject: [PATCH 07/12] feat: add versioning for TriggerDagRunPayload --- .../execution_api/versions/__init__.py | 2 ++ .../execution_api/versions/v2026_04_06.py | 15 ++++++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py index 646831501a7ec..40a5c2823a6e4 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py @@ -33,6 +33,7 @@ AddDagRunDetailEndpoint, AddNoteField, AddPartitionKeyField, + AddRunAfterField, MakeDagRunStartDateNullable, ModifyDeferredTaskKwargsToJsonValue, MovePreviousRunEndpoint, @@ -50,6 +51,7 @@ ModifyDeferredTaskKwargsToJsonValue, RemoveUpstreamMapIndexesField, AddNoteField, + AddRunAfterField, AddDagEndpoint, ), Version("2025-11-05", AddTriggeringUserNameField), diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py index 59e671f0a24a2..8a3817ac4db53 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py @@ -169,7 +169,10 @@ class AddNoteField(VersionChange): description = __doc__ - instructions_to_migrate_to_previous_version = (schema(DagRun).field("note").didnt_exist,) + instructions_to_migrate_to_previous_version = ( + schema(DagRun).field("note").didnt_exist, + schema(TriggerDAGRunPayload).field("note").didnt_exist, + ) @convert_response_to_previous_version_for(TIRunContext) # type: ignore[arg-type] def remove_note_field(response: ResponseInfo) -> None: # type: ignore[misc] @@ -184,3 +187,13 @@ class AddDagEndpoint(VersionChange): description = __doc__ instructions_to_migrate_to_previous_version = (endpoint("/dags/{dag_id}", ["GET"]).didnt_exist,) + + +class AddRunAfterField(VersionChange): + """Add run_after parameter to TriggerDAGRunPayload Model.""" + + description = __doc__ + + instructions_to_migrate_to_previous_version = ( + schema(TriggerDAGRunPayload).field("run_after").didnt_exist, + ) From 898208b07ce2b7dad9694607b6de3ee167cdd6e1 Mon Sep 17 00:00:00 2001 From: Arnold Lin Date: Thu, 2 Apr 2026 12:01:54 -0400 Subject: [PATCH 08/12] fix: add overload to _parse and _validate def --- .../standard/operators/trigger_dagrun.py | 35 +++++++++++++++---- 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py index b07a4ef5d6c1a..0bf919080ce31 100644 --- a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py +++ b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py @@ -23,7 +23,7 @@ import time from collections.abc import Sequence from json import JSONDecodeError -from typing import TYPE_CHECKING, Any, cast +from typing import TYPE_CHECKING, Any, cast, overload from sqlalchemy import select from sqlalchemy.orm.exc import NoResultFound @@ -44,6 +44,7 @@ from airflow.providers.standard.triggers.external_task import DagStateTrigger from airflow.providers.standard.utils.openlineage import safe_inject_openlineage_properties_into_dagrun_conf from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, BaseOperator +from airflow.sdk.definitions.param import is_arg_set from airflow.utils.state import DagRunState from airflow.utils.types import DagRunType @@ -479,8 +480,21 @@ def _trigger_dag_run_af_2_execute_complete( ) -def _validate_datetime_param(name: str, value): - if value is NOTSET: +@overload +def _validate_datetime_param(name: str, value: ArgNotSet) -> ArgNotSet: ... +@overload +def _validate_datetime_param(name: str, value: None) -> None: ... +@overload +def _validate_datetime_param(name: str, value: str) -> str: ... +@overload +def _validate_datetime_param(name: str, value: datetime.datetime) -> datetime.datetime: ... + + +def _validate_datetime_param( + name: str, + value: str | datetime.datetime | None | ArgNotSet, +) -> str | datetime.datetime | None | ArgNotSet: + if not is_arg_set(value): return NOTSET if value is None or isinstance(value, (str, datetime.datetime)): return value @@ -489,8 +503,17 @@ def _validate_datetime_param(name: str, value): ) -def _parse_datetime_param(value: str | datetime.datetime | None): +@overload +def _parse_datetime_param(value: None) -> None: ... +@overload +def _parse_datetime_param(value: datetime.datetime) -> datetime.datetime: ... +@overload +def _parse_datetime_param(value: str) -> datetime.datetime: ... + + +def _parse_datetime_param( + value: str | datetime.datetime | None, +) -> datetime.datetime | None: if value is None or isinstance(value, datetime.datetime): return value - if isinstance(value, str): - return timezone.parse(value) + return timezone.parse(value) From c6ecaefb2deeb578391c84185672a5f359dfc520 Mon Sep 17 00:00:00 2001 From: Arnold Lin Date: Thu, 2 Apr 2026 12:02:23 -0400 Subject: [PATCH 09/12] test: update unit test --- .../standard/operators/test_trigger_dagrun.py | 23 +++++++++++++++++-- task-sdk/tests/task_sdk/api/test_client.py | 5 +++- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py index d5235a468db99..389d8f1f98739 100644 --- a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py +++ b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py @@ -595,7 +595,7 @@ def test_trigger_dagrun(self, dag_maker, mock_supervisor_comms): with time_machine.travel("2025-02-18T08:04:46Z", tick=False): with dag_maker(TEST_DAG_ID, default_args={"start_date": DEFAULT_DATE}, serialized=True): task = TriggerDagRunOperator( - task_id="test_task", trigger_dag_id=TRIGGERED_DAG_ID, note="Test note" + task_id="test_task", trigger_dag_id=TRIGGERED_DAG_ID, note="Test note", run_after=None ) mock_warning = mock.patch.object(task.log, "warning").start() dag_maker.sync_dagbag_to_db() @@ -604,12 +604,31 @@ def test_trigger_dagrun(self, dag_maker, mock_supervisor_comms): task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) dagrun = dag_maker.session.scalar(select(DagRun).where(DagRun.dag_id == TRIGGERED_DAG_ID)) + unsupported_params = ["note"] assert mock_warning.mock_calls == [ - mock.call("Parameter 'note' is not supported in Airflow 2.x and will be ignored.") + mock.call( + "The following parameters are not supported in Airflow 2.x and will be ignored: %s", + ", ".join(unsupported_params), + ) ] assert dagrun.run_type == DagRunType.MANUAL assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, dagrun.logical_date) + def test_trigger_dagrun_does_not_warn_for_default_unsupported_params( + self, dag_maker, mock_supervisor_comms + ): + """Test TriggerDagRunOperator does not warn for unsupported params when they are not provided.""" + with time_machine.travel("2025-02-18T08:04:46Z", tick=False): + with dag_maker(TEST_DAG_ID, default_args={"start_date": DEFAULT_DATE}, serialized=True): + task = TriggerDagRunOperator(task_id="test_task", trigger_dag_id=TRIGGERED_DAG_ID) + mock_warning = mock.patch.object(task.log, "warning").start() + dag_maker.sync_dagbag_to_db() + parse_and_sync_to_db(self.f_name) + dag_maker.create_dagrun() + task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + mock_warning.assert_not_called() + def test_explicitly_provided_trigger_run_id_is_saved_as_attr(self, dag_maker, session): with dag_maker(TEST_DAG_ID, default_args={"start_date": DEFAULT_DATE}, serialized=True): task = TriggerDagRunOperator( diff --git a/task-sdk/tests/task_sdk/api/test_client.py b/task-sdk/tests/task_sdk/api/test_client.py index 0df8839c55f30..26e2a7e66bb6e 100644 --- a/task-sdk/tests/task_sdk/api/test_client.py +++ b/task-sdk/tests/task_sdk/api/test_client.py @@ -1238,7 +1238,10 @@ def handle_request(request: httpx.Request) -> httpx.Response: client = make_client(transport=httpx.MockTransport(handle_request)) result = client.dag_runs.trigger( - dag_id="test_trigger", run_id="test_run_id", logical_date=timezone.datetime(2025, 1, 1) + dag_id="test_trigger", + run_id="test_run_id", + logical_date=timezone.datetime(2025, 1, 1), + run_after=timezone.datetime(2025, 1, 2), ) assert result == OKResponse(ok=True) From 90457fe2168d42bb1d55dd52c472c30bcd4a59b9 Mon Sep 17 00:00:00 2001 From: Arnold Lin Date: Fri, 3 Apr 2026 01:26:29 -0400 Subject: [PATCH 10/12] fix: import is_arg_set --- .../providers/standard/operators/trigger_dagrun.py | 3 +-- .../airflow/providers/standard/version_compat.py | 13 +++++++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py index 0bf919080ce31..c02e2099e3a1b 100644 --- a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py +++ b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py @@ -43,8 +43,7 @@ ) from airflow.providers.standard.triggers.external_task import DagStateTrigger from airflow.providers.standard.utils.openlineage import safe_inject_openlineage_properties_into_dagrun_conf -from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, BaseOperator -from airflow.sdk.definitions.param import is_arg_set +from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, BaseOperator, is_arg_set from airflow.utils.state import DagRunState from airflow.utils.types import DagRunType diff --git a/providers/standard/src/airflow/providers/standard/version_compat.py b/providers/standard/src/airflow/providers/standard/version_compat.py index 769e790fb5972..440c856563dc2 100644 --- a/providers/standard/src/airflow/providers/standard/version_compat.py +++ b/providers/standard/src/airflow/providers/standard/version_compat.py @@ -41,10 +41,18 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: # This is needed for DecoratedOperator compatibility if AIRFLOW_V_3_1_PLUS: from airflow.sdk import BaseOperator - from airflow.sdk.definitions._internal.types import ArgNotSet + from airflow.sdk.definitions._internal.types import NOTSET, ArgNotSet else: from airflow.models.baseoperator import BaseOperator # type: ignore[no-redef] - from airflow.utils.types import ArgNotSet # type: ignore[attr-defined,no-redef] + from airflow.utils.types import NOTSET, ArgNotSet # type: ignore[attr-defined,no-redef] + +try: + from airflow.sdk.definitions._internal.types import is_arg_set +except ImportError: + + def is_arg_set(value): # type: ignore[misc,no-redef] + return value is not NOTSET + __all__ = [ "AIRFLOW_V_3_0_PLUS", @@ -52,4 +60,5 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: "AIRFLOW_V_3_2_PLUS", "ArgNotSet", "BaseOperator", + "is_arg_set", ] From 7778a94f32a821e500eebac00989ad5650cfc348 Mon Sep 17 00:00:00 2001 From: Arnold Lin Date: Mon, 13 Apr 2026 13:40:51 -0400 Subject: [PATCH 11/12] chore: typo modification --- .../src/airflow/providers/standard/operators/trigger_dagrun.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py index c02e2099e3a1b..839cb6a25426e 100644 --- a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py +++ b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py @@ -315,7 +315,7 @@ def _trigger_dag_af_3(self, context, run_id, parsed_logical_date, parsed_run_aft def _trigger_dag_af_2(self, context, run_id, parsed_logical_date): try: unsupported_parameters = [] - for attr, default_value in self.attributes_not_suppported_in_airflow_2.items(): + for attr, default_value in self.attributes_not_supported_in_airflow_2.items(): value = getattr(self, attr, default_value) if value is not default_value: unsupported_parameters.append(attr) From 2d9b49209da6f4a71734d0f1805178b0dc8aa1eb Mon Sep 17 00:00:00 2001 From: Arnold Lin Date: Tue, 14 Apr 2026 01:18:56 -0400 Subject: [PATCH 12/12] fix: fix unit test --- .../tests/unit/standard/operators/test_trigger_dagrun.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py index 389d8f1f98739..bcc5ad6153c6e 100644 --- a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py +++ b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py @@ -604,7 +604,7 @@ def test_trigger_dagrun(self, dag_maker, mock_supervisor_comms): task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) dagrun = dag_maker.session.scalar(select(DagRun).where(DagRun.dag_id == TRIGGERED_DAG_ID)) - unsupported_params = ["note"] + unsupported_params = ["run_after", "note"] assert mock_warning.mock_calls == [ mock.call( "The following parameters are not supported in Airflow 2.x and will be ignored: %s",