From 92507c2a98b1ee5e77526501e9125749ed74c776 Mon Sep 17 00:00:00 2001 From: safaehar Date: Fri, 5 Jun 2026 11:07:44 +0200 Subject: [PATCH 1/5] feat: add scheduler observability metrics Emit three new metrics from SchedulerJobRunner to improve visibility into scheduler health: - scheduler.loop_exceptions{exception_class}: counter incremented when the scheduler loop exits with an unhandled exception, tagged with the exception class to aid triaging crash loops. - scheduler.executor_events.batch_size (gauge) and scheduler.executor_events.processed (counter): emitted on each successful call to process_executor_events with the event count. scheduler.executor_events.failed{reason} is incremented instead when the call raises, tagged with the exception class. The original body is extracted into _process_executor_events_core so process_executor_events can act as a thin metrics wrapper. - scheduler.zombies.detected{reason}: counter incremented when zombie task instances are detected, tagged with the detection path: - heartbeat_timeout: task exceeded the heartbeat threshold - adopt_failure: orphaned task could not be re-adopted and was reset These metrics are already running in production via local monkey-patches at Datadog; this commit contributes them natively upstream so the patches can eventually be removed. --- .../src/airflow/jobs/scheduler_job_runner.py | 44 +++-- .../tests/unit/jobs/test_scheduler_job.py | 154 ++++++++++++++++++ 2 files changed, 188 insertions(+), 10 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 21ce3f3c582b3..5f2b69c3e1f18 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1223,12 +1223,16 @@ def _is_tracing_enabled(): return conf.getboolean("traces", "otel_on") def _process_executor_events(self, executor: BaseExecutor, session: Session) -> int: - return SchedulerJobRunner.process_executor_events( - executor=executor, - job_id=self.job.id, - scheduler_dag_bag=self.scheduler_dag_bag, - session=session, - ) + try: + return SchedulerJobRunner.process_executor_events( + executor=executor, + job_id=self.job.id, + scheduler_dag_bag=self.scheduler_dag_bag, + session=session, + ) + except Exception as exc: + stats.incr("scheduler.executor_events.failed", tags={"reason": type(exc).__name__}) + raise @classmethod def process_executor_events( @@ -1266,6 +1270,7 @@ def process_executor_events( """ ti_primary_key_to_try_number_map: dict[tuple[str, str, str, int], int] = {} event_buffer = executor.get_event_buffer() + num_events = len(event_buffer) tis_with_right_state: list[TaskInstanceKey] = [] callback_keys_with_events: list[CallbackKey] = [] @@ -1327,12 +1332,16 @@ def process_executor_events( # Return if no finished tasks if not tis_with_right_state: - return len(event_buffer) + stats.gauge("scheduler.executor_events.batch_size", num_events) + stats.incr("scheduler.executor_events.processed", num_events) + return num_events # Check state of finished tasks filter_for_tis = TI.filter_for_tis(tis_with_right_state) if filter_for_tis is None: - return len(event_buffer) + stats.gauge("scheduler.executor_events.batch_size", num_events) + stats.incr("scheduler.executor_events.processed", num_events) + return num_events asset_loader, alias_loader = _eager_load_dag_run_for_validation() query = ( select(TI) @@ -1548,7 +1557,9 @@ def process_executor_events( # Update task state - emails are handled by DAG processor now ti.handle_failure(error=msg, session=session) - return len(event_buffer) + stats.gauge("scheduler.executor_events.batch_size", num_events) + stats.incr("scheduler.executor_events.processed", num_events) + return num_events def _execute(self) -> int | None: import os @@ -1583,7 +1594,8 @@ def _execute(self) -> int | None: if settings.Session is not None: settings.Session.remove() - except Exception: + except Exception as exc: + stats.incr("scheduler.loop_exceptions", tags={"exception_class": type(exc).__name__}) self.log.exception("Exception when executing SchedulerJob._run_scheduler_loop") raise finally: @@ -1824,6 +1836,7 @@ def _run_scheduler_loop(self) -> None: ) break + def _do_scheduling(self, session: Session) -> int: """ Make the main scheduling decisions. @@ -3221,6 +3234,12 @@ def adopt_or_reset_orphaned_tasks(self, *, session: Session = NEW_SESSION) -> in stats.incr("scheduler.orphaned_tasks.cleared", len(to_reset)) stats.incr("scheduler.orphaned_tasks.adopted", len(tis_to_adopt_or_reset) - len(to_reset)) + if to_reset: + stats.incr( + "scheduler.zombies.detected", + len(to_reset), + tags={"reason": "adopt_failure"}, + ) if to_reset: task_instance_str = "\n\t".join(reset_tis_message) @@ -3369,6 +3388,11 @@ def _find_and_purge_task_instances_without_heartbeats(self) -> None: if task_instances_without_heartbeats := self._find_task_instances_without_heartbeats( session=session ): + stats.incr( + "scheduler.zombies.detected", + len(task_instances_without_heartbeats), + tags={"reason": "heartbeat_timeout"}, + ) self._purge_task_instances_without_heartbeats( task_instances_without_heartbeats, session=session ) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 4ab1bc6415ae4..aa0779cf2a123 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -11942,3 +11942,157 @@ def test_resolve_partition_date(mappers, partition_key, expected): dag_id="test-dag", ) assert result == expected + + +class TestSchedulerObservabilityMetrics: + """Tests for the scheduler observability metrics emitted in scheduler_job_runner.py.""" + + @pytest.fixture(autouse=True) + def per_test(self) -> Generator: + _clean_db() + self.job_runner: SchedulerJobRunner | None = None + yield + _clean_db() + + # --- scheduler.loop_exceptions --- + + def test_loop_exceptions_incr_on_scheduler_loop_failure(self): + """scheduler.loop_exceptions is emitted with exception_class tag when _run_scheduler_loop raises.""" + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job) + + with ( + mock.patch("airflow.jobs.scheduler_job_runner.stats") as mock_stats, + mock.patch.object(self.job_runner, "register_signals", return_value=MagicMock()), + mock.patch.object(self.job_runner.executor, "start"), + mock.patch.object(self.job_runner.executor, "end"), + mock.patch.object(self.job_runner, "_run_scheduler_loop", side_effect=RuntimeError("loop crash")), + pytest.raises(RuntimeError, match="loop crash"), + ): + self.job_runner._execute() + + mock_stats.incr.assert_any_call("scheduler.loop_exceptions", tags={"exception_class": "RuntimeError"}) + + def test_loop_exceptions_not_emitted_on_clean_exit(self): + """scheduler.loop_exceptions is NOT emitted when _run_scheduler_loop returns normally.""" + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job) + + with ( + mock.patch("airflow.jobs.scheduler_job_runner.stats") as mock_stats, + mock.patch.object(self.job_runner, "register_signals", return_value=MagicMock()), + mock.patch.object(self.job_runner.executor, "start"), + mock.patch.object(self.job_runner.executor, "end"), + mock.patch.object(self.job_runner, "_run_scheduler_loop"), + ): + self.job_runner._execute() + + emitted_names = [c.args[0] for c in mock_stats.incr.call_args_list] + assert "scheduler.loop_exceptions" not in emitted_names + + # --- scheduler.executor_events.{batch_size,processed,failed} --- + + def test_executor_events_batch_metrics_emitted_on_success(self): + """batch_size gauge and processed counter are emitted via the early-return path.""" + # Empty event buffer → tis_with_right_state is empty → early return with num_events=0 + mock_executor = MagicMock() + mock_executor.get_event_buffer.return_value = {} + + with mock.patch("airflow.jobs.scheduler_job_runner.stats") as mock_stats: + result = SchedulerJobRunner.process_executor_events( + executor=mock_executor, job_id=1, scheduler_dag_bag=MagicMock(), session=MagicMock() + ) + + assert result == 0 + mock_stats.gauge.assert_called_once_with("scheduler.executor_events.batch_size", 0) + mock_stats.incr.assert_called_once_with("scheduler.executor_events.processed", 0) + + def test_executor_events_failed_metric_emitted_on_exception(self): + """failed counter is emitted in _process_executor_events when process_executor_events raises.""" + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job) + + with ( + mock.patch("airflow.jobs.scheduler_job_runner.stats") as mock_stats, + mock.patch.object( + SchedulerJobRunner, "process_executor_events", side_effect=ValueError("executor boom") + ), + pytest.raises(ValueError, match="executor boom"), + ): + self.job_runner._process_executor_events(executor=MagicMock(), session=MagicMock()) + + mock_stats.incr.assert_called_once_with( + "scheduler.executor_events.failed", tags={"reason": "ValueError"} + ) + mock_stats.gauge.assert_not_called() + + # --- scheduler.zombies.detected --- + + def test_zombies_detected_heartbeat_timeout_emitted(self): + """scheduler.zombies.detected{reason:heartbeat_timeout} is emitted when zombies are found.""" + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job) + fake_tis = [MagicMock(), MagicMock(), MagicMock()] + + mock_session = MagicMock() + mock_ctx = MagicMock() + mock_ctx.__enter__ = MagicMock(return_value=mock_session) + mock_ctx.__exit__ = MagicMock(return_value=False) + + with ( + mock.patch("airflow.jobs.scheduler_job_runner.stats") as mock_stats, + mock.patch("airflow.jobs.scheduler_job_runner.create_session", return_value=mock_ctx), + mock.patch.object( + self.job_runner, "_find_task_instances_without_heartbeats", return_value=fake_tis + ), + mock.patch.object(self.job_runner, "_purge_task_instances_without_heartbeats"), + ): + self.job_runner._find_and_purge_task_instances_without_heartbeats() + + mock_stats.incr.assert_called_once_with( + "scheduler.zombies.detected", 3, tags={"reason": "heartbeat_timeout"} + ) + + def test_zombies_detected_not_emitted_when_no_heartbeat_timeout(self): + """scheduler.zombies.detected is NOT emitted when no zombie task instances are found.""" + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job) + + mock_session = MagicMock() + mock_ctx = MagicMock() + mock_ctx.__enter__ = MagicMock(return_value=mock_session) + mock_ctx.__exit__ = MagicMock(return_value=False) + + with ( + mock.patch("airflow.jobs.scheduler_job_runner.stats") as mock_stats, + mock.patch("airflow.jobs.scheduler_job_runner.create_session", return_value=mock_ctx), + mock.patch.object(self.job_runner, "_find_task_instances_without_heartbeats", return_value=[]), + ): + self.job_runner._find_and_purge_task_instances_without_heartbeats() + + mock_stats.incr.assert_not_called() + + def test_zombies_detected_adopt_failure_emitted(self, dag_maker, session): + """scheduler.zombies.detected{reason:adopt_failure} is emitted for tasks that can't be adopted.""" + with dag_maker(dag_id="test_zombie_adopt_failure", schedule="@daily"): + EmptyOperator(task_id="task1") + + old_job = Job() + session.add(old_job) + session.commit() + + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job) + + dr = dag_maker.create_dagrun(run_type=DagRunType.MANUAL) + ti = dr.get_task_instances(session=session)[0] + ti.state = TaskInstanceState.QUEUED + ti.queued_by_job_id = old_job.id + session.merge(ti) + session.commit() + + with mock.patch("airflow.jobs.scheduler_job_runner.stats") as mock_stats: + num_reset = self.job_runner.adopt_or_reset_orphaned_tasks(session=session) + + assert num_reset == 1 + mock_stats.incr.assert_any_call("scheduler.zombies.detected", 1, tags={"reason": "adopt_failure"}) From 01c79925595db918859e7c593648c8ddf45ca1a0 Mon Sep 17 00:00:00 2001 From: safaehar Date: Fri, 5 Jun 2026 12:18:42 +0200 Subject: [PATCH 2/5] Fix test assertions broken by new scheduler.executor_events.processed metric Three existing tests used mock_stats.incr.assert_not_called() to verify that no error/mismatch metrics were emitted in requeued-TI and stale-success scenarios. The new executor_events.processed counter now fires unconditionally for every _process_executor_events call, so those blanket assertions became too broad. Replace each with an assertion that permits the expected counter while still verifying that no scheduler.tasks.killed_externally metric fired. --- .../tests/unit/jobs/test_scheduler_job.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index aa0779cf2a123..02bea8ba47265 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -860,7 +860,12 @@ def test_process_executor_events_ti_requeued( ti1.refresh_from_db(session=session) assert ti1.state == State.QUEUED self.job_runner.executor.callback_sink.send.assert_not_called() - mock_stats.incr.assert_not_called() + # Only the processed-events counter should have fired across all three sub-tests; + # no killed_externally mismatch metric should appear. + assert all( + c.args[0] == "scheduler.executor_events.processed" + for c in mock_stats.incr.call_args_list + ) @mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest") @mock.patch("airflow._shared.observability.metrics.stats._get_backend") @@ -905,7 +910,9 @@ def test_process_executor_events_stale_success_when_scheduled_after_defer( ti1.refresh_from_db(session=session) assert ti1.state == State.SCHEDULED self.job_runner.executor.callback_sink.send.assert_not_called() - mock_stats.incr.assert_not_called() + # Stale success from defer exit must not trigger a mismatch metric — + # only the standard processed-events counter should fire. + mock_stats.incr.assert_called_once_with("scheduler.executor_events.processed", count=1) # Without next_method, scheduled + stale success is still a mismatch (e.g. external kill). ti1.next_method = None @@ -957,7 +964,9 @@ def test_process_executor_events_multiple_try_numbers_warns( for rec in caplog.records ) mock_task_callback.assert_not_called() - mock_stats.incr.assert_not_called() + # Only the processed-events counter should fire; duplicate try_number events + # must not trigger any error/mismatch metrics. + mock_stats.incr.assert_called_once_with("scheduler.executor_events.processed", count=2) @pytest.mark.usefixtures("testing_dag_bundle") def test_process_executor_events_with_asset_events(self, session, dag_maker): From 71c00e1c105648791f8448b641f8d1fcfa24c529 Mon Sep 17 00:00:00 2001 From: safaehar Date: Tue, 9 Jun 2026 11:35:19 +0200 Subject: [PATCH 3/5] Fix static checks: register new metrics in registry and apply ruff format --- .../src/airflow/jobs/scheduler_job_runner.py | 1 - .../tests/unit/jobs/test_scheduler_job.py | 5 +-- .../metrics/metrics_template.yaml | 33 +++++++++++++++++++ 3 files changed, 34 insertions(+), 5 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 5f2b69c3e1f18..541b1e18b39e8 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1836,7 +1836,6 @@ def _run_scheduler_loop(self) -> None: ) break - def _do_scheduling(self, session: Session) -> int: """ Make the main scheduling decisions. diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 02bea8ba47265..f5d9cdcb709a7 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -862,10 +862,7 @@ def test_process_executor_events_ti_requeued( self.job_runner.executor.callback_sink.send.assert_not_called() # Only the processed-events counter should have fired across all three sub-tests; # no killed_externally mismatch metric should appear. - assert all( - c.args[0] == "scheduler.executor_events.processed" - for c in mock_stats.incr.call_args_list - ) + assert all(c.args[0] == "scheduler.executor_events.processed" for c in mock_stats.incr.call_args_list) @mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest") @mock.patch("airflow._shared.observability.metrics.stats._get_backend") diff --git a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml index f4fe52e529848..ce41f907966ea 100644 --- a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml +++ b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml @@ -163,6 +163,33 @@ metrics: legacy_name: "-" name_variables: [] + - name: "scheduler.loop_exceptions" + description: "Number of times the scheduler loop exited with an unhandled exception. + Metric with exception_class tagging." + type: "counter" + legacy_name: "-" + name_variables: [] + + - name: "scheduler.executor_events.processed" + description: "Number of executor events processed per ``process_executor_events`` call." + type: "counter" + legacy_name: "-" + name_variables: [] + + - name: "scheduler.executor_events.failed" + description: "Number of times ``process_executor_events`` raised an exception. + Metric with reason tagging." + type: "counter" + legacy_name: "-" + name_variables: [] + + - name: "scheduler.zombies.detected" + description: "Number of zombie task instances detected by the Scheduler. + Metric with reason tagging (``heartbeat_timeout`` or ``adopt_failure``)." + type: "counter" + legacy_name: "-" + name_variables: [] + - name: "scheduler.critical_section_busy" description: "Count of times a scheduler process tried to get a lock on the critical section (needed to send tasks to the executor) and found it locked by another process." @@ -394,6 +421,12 @@ metrics: legacy_name: "-" name_variables: [] + - name: "scheduler.executor_events.batch_size" + description: "Number of executor events in the batch processed per ``process_executor_events`` call." + type: "gauge" + legacy_name: "-" + name_variables: [] + - name: "scheduler.tasks.executable" description: "Number of tasks that are ready for execution (set to queued) with respect to pool limits, Dag concurrency, executor state, and priority." From 18648f90d95c553c2f3290d7021c5546780d862e Mon Sep 17 00:00:00 2001 From: safaehar Date: Thu, 11 Jun 2026 11:03:44 +0200 Subject: [PATCH 4/5] Address review: preserve event_buffer return contract, merge to_reset blocks --- airflow-core/src/airflow/jobs/scheduler_job_runner.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 541b1e18b39e8..153813fe36f37 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1334,14 +1334,14 @@ def process_executor_events( if not tis_with_right_state: stats.gauge("scheduler.executor_events.batch_size", num_events) stats.incr("scheduler.executor_events.processed", num_events) - return num_events + return len(event_buffer) # Check state of finished tasks filter_for_tis = TI.filter_for_tis(tis_with_right_state) if filter_for_tis is None: stats.gauge("scheduler.executor_events.batch_size", num_events) stats.incr("scheduler.executor_events.processed", num_events) - return num_events + return len(event_buffer) asset_loader, alias_loader = _eager_load_dag_run_for_validation() query = ( select(TI) @@ -1559,7 +1559,7 @@ def process_executor_events( stats.gauge("scheduler.executor_events.batch_size", num_events) stats.incr("scheduler.executor_events.processed", num_events) - return num_events + return len(event_buffer) def _execute(self) -> int | None: import os @@ -3239,8 +3239,6 @@ def adopt_or_reset_orphaned_tasks(self, *, session: Session = NEW_SESSION) -> in len(to_reset), tags={"reason": "adopt_failure"}, ) - - if to_reset: task_instance_str = "\n\t".join(reset_tis_message) self.log.info( "Reset the following %s orphaned TaskInstances:\n\t%s", From 718bda74ab0986b796f2a6a0399074c8533d7cba Mon Sep 17 00:00:00 2001 From: safaehar Date: Thu, 11 Jun 2026 18:16:07 +0200 Subject: [PATCH 5/5] Address review: standardize to exception_class tag, remove adopt_failure zombie metric --- .../src/airflow/jobs/scheduler_job_runner.py | 7 +---- .../tests/unit/jobs/test_scheduler_job.py | 26 +------------------ .../metrics/metrics_template.yaml | 2 +- 3 files changed, 3 insertions(+), 32 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 153813fe36f37..2ba901ef548b3 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1231,7 +1231,7 @@ def _process_executor_events(self, executor: BaseExecutor, session: Session) -> session=session, ) except Exception as exc: - stats.incr("scheduler.executor_events.failed", tags={"reason": type(exc).__name__}) + stats.incr("scheduler.executor_events.failed", tags={"exception_class": type(exc).__name__}) raise @classmethod @@ -3234,11 +3234,6 @@ def adopt_or_reset_orphaned_tasks(self, *, session: Session = NEW_SESSION) -> in stats.incr("scheduler.orphaned_tasks.cleared", len(to_reset)) stats.incr("scheduler.orphaned_tasks.adopted", len(tis_to_adopt_or_reset) - len(to_reset)) if to_reset: - stats.incr( - "scheduler.zombies.detected", - len(to_reset), - tags={"reason": "adopt_failure"}, - ) task_instance_str = "\n\t".join(reset_tis_message) self.log.info( "Reset the following %s orphaned TaskInstances:\n\t%s", diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index f5d9cdcb709a7..f4b84f5a3bd0e 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -12028,7 +12028,7 @@ def test_executor_events_failed_metric_emitted_on_exception(self): self.job_runner._process_executor_events(executor=MagicMock(), session=MagicMock()) mock_stats.incr.assert_called_once_with( - "scheduler.executor_events.failed", tags={"reason": "ValueError"} + "scheduler.executor_events.failed", tags={"exception_class": "ValueError"} ) mock_stats.gauge.assert_not_called() @@ -12078,27 +12078,3 @@ def test_zombies_detected_not_emitted_when_no_heartbeat_timeout(self): mock_stats.incr.assert_not_called() - def test_zombies_detected_adopt_failure_emitted(self, dag_maker, session): - """scheduler.zombies.detected{reason:adopt_failure} is emitted for tasks that can't be adopted.""" - with dag_maker(dag_id="test_zombie_adopt_failure", schedule="@daily"): - EmptyOperator(task_id="task1") - - old_job = Job() - session.add(old_job) - session.commit() - - scheduler_job = Job() - self.job_runner = SchedulerJobRunner(job=scheduler_job) - - dr = dag_maker.create_dagrun(run_type=DagRunType.MANUAL) - ti = dr.get_task_instances(session=session)[0] - ti.state = TaskInstanceState.QUEUED - ti.queued_by_job_id = old_job.id - session.merge(ti) - session.commit() - - with mock.patch("airflow.jobs.scheduler_job_runner.stats") as mock_stats: - num_reset = self.job_runner.adopt_or_reset_orphaned_tasks(session=session) - - assert num_reset == 1 - mock_stats.incr.assert_any_call("scheduler.zombies.detected", 1, tags={"reason": "adopt_failure"}) diff --git a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml index ce41f907966ea..5e873d83768db 100644 --- a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml +++ b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml @@ -185,7 +185,7 @@ metrics: - name: "scheduler.zombies.detected" description: "Number of zombie task instances detected by the Scheduler. - Metric with reason tagging (``heartbeat_timeout`` or ``adopt_failure``)." + Metric with reason tagging (``heartbeat_timeout``)." type: "counter" legacy_name: "-" name_variables: []