diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 21ce3f3c582b3..153813fe36f37 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1223,12 +1223,16 @@ def _is_tracing_enabled(): return conf.getboolean("traces", "otel_on") def _process_executor_events(self, executor: BaseExecutor, session: Session) -> int: - return SchedulerJobRunner.process_executor_events( - executor=executor, - job_id=self.job.id, - scheduler_dag_bag=self.scheduler_dag_bag, - session=session, - ) + try: + return SchedulerJobRunner.process_executor_events( + executor=executor, + job_id=self.job.id, + scheduler_dag_bag=self.scheduler_dag_bag, + session=session, + ) + except Exception as exc: + stats.incr("scheduler.executor_events.failed", tags={"reason": type(exc).__name__}) + raise @classmethod def process_executor_events( @@ -1266,6 +1270,7 @@ def process_executor_events( """ ti_primary_key_to_try_number_map: dict[tuple[str, str, str, int], int] = {} event_buffer = executor.get_event_buffer() + num_events = len(event_buffer) tis_with_right_state: list[TaskInstanceKey] = [] callback_keys_with_events: list[CallbackKey] = [] @@ -1327,11 +1332,15 @@ def process_executor_events( # Return if no finished tasks if not tis_with_right_state: + stats.gauge("scheduler.executor_events.batch_size", num_events) + stats.incr("scheduler.executor_events.processed", num_events) return len(event_buffer) # Check state of finished tasks filter_for_tis = TI.filter_for_tis(tis_with_right_state) if filter_for_tis is None: + stats.gauge("scheduler.executor_events.batch_size", num_events) + stats.incr("scheduler.executor_events.processed", num_events) return len(event_buffer) asset_loader, alias_loader = _eager_load_dag_run_for_validation() query = ( @@ -1548,6 +1557,8 @@ def process_executor_events( # Update task state - emails are handled by DAG processor now ti.handle_failure(error=msg, session=session) + stats.gauge("scheduler.executor_events.batch_size", num_events) + stats.incr("scheduler.executor_events.processed", num_events) return len(event_buffer) def _execute(self) -> int | None: @@ -1583,7 +1594,8 @@ def _execute(self) -> int | None: if settings.Session is not None: settings.Session.remove() - except Exception: + except Exception as exc: + stats.incr("scheduler.loop_exceptions", tags={"exception_class": type(exc).__name__}) self.log.exception("Exception when executing SchedulerJob._run_scheduler_loop") raise finally: @@ -3221,8 +3233,12 @@ def adopt_or_reset_orphaned_tasks(self, *, session: Session = NEW_SESSION) -> in stats.incr("scheduler.orphaned_tasks.cleared", len(to_reset)) stats.incr("scheduler.orphaned_tasks.adopted", len(tis_to_adopt_or_reset) - len(to_reset)) - if to_reset: + stats.incr( + "scheduler.zombies.detected", + len(to_reset), + tags={"reason": "adopt_failure"}, + ) task_instance_str = "\n\t".join(reset_tis_message) self.log.info( "Reset the following %s orphaned TaskInstances:\n\t%s", @@ -3369,6 +3385,11 @@ def _find_and_purge_task_instances_without_heartbeats(self) -> None: if task_instances_without_heartbeats := self._find_task_instances_without_heartbeats( session=session ): + stats.incr( + "scheduler.zombies.detected", + len(task_instances_without_heartbeats), + tags={"reason": "heartbeat_timeout"}, + ) self._purge_task_instances_without_heartbeats( task_instances_without_heartbeats, session=session ) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 4ab1bc6415ae4..f5d9cdcb709a7 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -860,7 +860,9 @@ def test_process_executor_events_ti_requeued( ti1.refresh_from_db(session=session) assert ti1.state == State.QUEUED self.job_runner.executor.callback_sink.send.assert_not_called() - mock_stats.incr.assert_not_called() + # Only the processed-events counter should have fired across all three sub-tests; + # no killed_externally mismatch metric should appear. + assert all(c.args[0] == "scheduler.executor_events.processed" for c in mock_stats.incr.call_args_list) @mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest") @mock.patch("airflow._shared.observability.metrics.stats._get_backend") @@ -905,7 +907,9 @@ def test_process_executor_events_stale_success_when_scheduled_after_defer( ti1.refresh_from_db(session=session) assert ti1.state == State.SCHEDULED self.job_runner.executor.callback_sink.send.assert_not_called() - mock_stats.incr.assert_not_called() + # Stale success from defer exit must not trigger a mismatch metric — + # only the standard processed-events counter should fire. + mock_stats.incr.assert_called_once_with("scheduler.executor_events.processed", count=1) # Without next_method, scheduled + stale success is still a mismatch (e.g. external kill). ti1.next_method = None @@ -957,7 +961,9 @@ def test_process_executor_events_multiple_try_numbers_warns( for rec in caplog.records ) mock_task_callback.assert_not_called() - mock_stats.incr.assert_not_called() + # Only the processed-events counter should fire; duplicate try_number events + # must not trigger any error/mismatch metrics. + mock_stats.incr.assert_called_once_with("scheduler.executor_events.processed", count=2) @pytest.mark.usefixtures("testing_dag_bundle") def test_process_executor_events_with_asset_events(self, session, dag_maker): @@ -11942,3 +11948,157 @@ def test_resolve_partition_date(mappers, partition_key, expected): dag_id="test-dag", ) assert result == expected + + +class TestSchedulerObservabilityMetrics: + """Tests for the scheduler observability metrics emitted in scheduler_job_runner.py.""" + + @pytest.fixture(autouse=True) + def per_test(self) -> Generator: + _clean_db() + self.job_runner: SchedulerJobRunner | None = None + yield + _clean_db() + + # --- scheduler.loop_exceptions --- + + def test_loop_exceptions_incr_on_scheduler_loop_failure(self): + """scheduler.loop_exceptions is emitted with exception_class tag when _run_scheduler_loop raises.""" + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job) + + with ( + mock.patch("airflow.jobs.scheduler_job_runner.stats") as mock_stats, + mock.patch.object(self.job_runner, "register_signals", return_value=MagicMock()), + mock.patch.object(self.job_runner.executor, "start"), + mock.patch.object(self.job_runner.executor, "end"), + mock.patch.object(self.job_runner, "_run_scheduler_loop", side_effect=RuntimeError("loop crash")), + pytest.raises(RuntimeError, match="loop crash"), + ): + self.job_runner._execute() + + mock_stats.incr.assert_any_call("scheduler.loop_exceptions", tags={"exception_class": "RuntimeError"}) + + def test_loop_exceptions_not_emitted_on_clean_exit(self): + """scheduler.loop_exceptions is NOT emitted when _run_scheduler_loop returns normally.""" + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job) + + with ( + mock.patch("airflow.jobs.scheduler_job_runner.stats") as mock_stats, + mock.patch.object(self.job_runner, "register_signals", return_value=MagicMock()), + mock.patch.object(self.job_runner.executor, "start"), + mock.patch.object(self.job_runner.executor, "end"), + mock.patch.object(self.job_runner, "_run_scheduler_loop"), + ): + self.job_runner._execute() + + emitted_names = [c.args[0] for c in mock_stats.incr.call_args_list] + assert "scheduler.loop_exceptions" not in emitted_names + + # --- scheduler.executor_events.{batch_size,processed,failed} --- + + def test_executor_events_batch_metrics_emitted_on_success(self): + """batch_size gauge and processed counter are emitted via the early-return path.""" + # Empty event buffer → tis_with_right_state is empty → early return with num_events=0 + mock_executor = MagicMock() + mock_executor.get_event_buffer.return_value = {} + + with mock.patch("airflow.jobs.scheduler_job_runner.stats") as mock_stats: + result = SchedulerJobRunner.process_executor_events( + executor=mock_executor, job_id=1, scheduler_dag_bag=MagicMock(), session=MagicMock() + ) + + assert result == 0 + mock_stats.gauge.assert_called_once_with("scheduler.executor_events.batch_size", 0) + mock_stats.incr.assert_called_once_with("scheduler.executor_events.processed", 0) + + def test_executor_events_failed_metric_emitted_on_exception(self): + """failed counter is emitted in _process_executor_events when process_executor_events raises.""" + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job) + + with ( + mock.patch("airflow.jobs.scheduler_job_runner.stats") as mock_stats, + mock.patch.object( + SchedulerJobRunner, "process_executor_events", side_effect=ValueError("executor boom") + ), + pytest.raises(ValueError, match="executor boom"), + ): + self.job_runner._process_executor_events(executor=MagicMock(), session=MagicMock()) + + mock_stats.incr.assert_called_once_with( + "scheduler.executor_events.failed", tags={"reason": "ValueError"} + ) + mock_stats.gauge.assert_not_called() + + # --- scheduler.zombies.detected --- + + def test_zombies_detected_heartbeat_timeout_emitted(self): + """scheduler.zombies.detected{reason:heartbeat_timeout} is emitted when zombies are found.""" + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job) + fake_tis = [MagicMock(), MagicMock(), MagicMock()] + + mock_session = MagicMock() + mock_ctx = MagicMock() + mock_ctx.__enter__ = MagicMock(return_value=mock_session) + mock_ctx.__exit__ = MagicMock(return_value=False) + + with ( + mock.patch("airflow.jobs.scheduler_job_runner.stats") as mock_stats, + mock.patch("airflow.jobs.scheduler_job_runner.create_session", return_value=mock_ctx), + mock.patch.object( + self.job_runner, "_find_task_instances_without_heartbeats", return_value=fake_tis + ), + mock.patch.object(self.job_runner, "_purge_task_instances_without_heartbeats"), + ): + self.job_runner._find_and_purge_task_instances_without_heartbeats() + + mock_stats.incr.assert_called_once_with( + "scheduler.zombies.detected", 3, tags={"reason": "heartbeat_timeout"} + ) + + def test_zombies_detected_not_emitted_when_no_heartbeat_timeout(self): + """scheduler.zombies.detected is NOT emitted when no zombie task instances are found.""" + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job) + + mock_session = MagicMock() + mock_ctx = MagicMock() + mock_ctx.__enter__ = MagicMock(return_value=mock_session) + mock_ctx.__exit__ = MagicMock(return_value=False) + + with ( + mock.patch("airflow.jobs.scheduler_job_runner.stats") as mock_stats, + mock.patch("airflow.jobs.scheduler_job_runner.create_session", return_value=mock_ctx), + mock.patch.object(self.job_runner, "_find_task_instances_without_heartbeats", return_value=[]), + ): + self.job_runner._find_and_purge_task_instances_without_heartbeats() + + mock_stats.incr.assert_not_called() + + def test_zombies_detected_adopt_failure_emitted(self, dag_maker, session): + """scheduler.zombies.detected{reason:adopt_failure} is emitted for tasks that can't be adopted.""" + with dag_maker(dag_id="test_zombie_adopt_failure", schedule="@daily"): + EmptyOperator(task_id="task1") + + old_job = Job() + session.add(old_job) + session.commit() + + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job) + + dr = dag_maker.create_dagrun(run_type=DagRunType.MANUAL) + ti = dr.get_task_instances(session=session)[0] + ti.state = TaskInstanceState.QUEUED + ti.queued_by_job_id = old_job.id + session.merge(ti) + session.commit() + + with mock.patch("airflow.jobs.scheduler_job_runner.stats") as mock_stats: + num_reset = self.job_runner.adopt_or_reset_orphaned_tasks(session=session) + + assert num_reset == 1 + mock_stats.incr.assert_any_call("scheduler.zombies.detected", 1, tags={"reason": "adopt_failure"}) diff --git a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml index f4fe52e529848..ce41f907966ea 100644 --- a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml +++ b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml @@ -163,6 +163,33 @@ metrics: legacy_name: "-" name_variables: [] + - name: "scheduler.loop_exceptions" + description: "Number of times the scheduler loop exited with an unhandled exception. + Metric with exception_class tagging." + type: "counter" + legacy_name: "-" + name_variables: [] + + - name: "scheduler.executor_events.processed" + description: "Number of executor events processed per ``process_executor_events`` call." + type: "counter" + legacy_name: "-" + name_variables: [] + + - name: "scheduler.executor_events.failed" + description: "Number of times ``process_executor_events`` raised an exception. + Metric with reason tagging." + type: "counter" + legacy_name: "-" + name_variables: [] + + - name: "scheduler.zombies.detected" + description: "Number of zombie task instances detected by the Scheduler. + Metric with reason tagging (``heartbeat_timeout`` or ``adopt_failure``)." + type: "counter" + legacy_name: "-" + name_variables: [] + - name: "scheduler.critical_section_busy" description: "Count of times a scheduler process tried to get a lock on the critical section (needed to send tasks to the executor) and found it locked by another process." @@ -394,6 +421,12 @@ metrics: legacy_name: "-" name_variables: [] + - name: "scheduler.executor_events.batch_size" + description: "Number of executor events in the batch processed per ``process_executor_events`` call." + type: "gauge" + legacy_name: "-" + name_variables: [] + - name: "scheduler.tasks.executable" description: "Number of tasks that are ready for execution (set to queued) with respect to pool limits, Dag concurrency, executor state, and priority."