feat: add scheduler observability metrics#68068
Conversation
d018424 to
27131ce
Compare
There was a problem hiding this comment.
Pull request overview
Adds scheduler observability metrics in SchedulerJobRunner to improve visibility into scheduler loop failures, executor event processing volume/failures, and zombie task detection, along with unit tests to validate emission.
Changes:
- Emit
scheduler.loop_exceptions{exception_class}when_execute()exits due to an unhandled exception. - Emit executor event batch metrics (
scheduler.executor_events.batch_size,scheduler.executor_events.processed,scheduler.executor_events.failed{reason}) aroundprocess_executor_events(). - Emit
scheduler.zombies.detected{reason}for heartbeat-timeout zombies and orphan adopt/reset failures, with new/updated unit tests.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
airflow-core/src/airflow/jobs/scheduler_job_runner.py |
Adds new scheduler metrics for loop exceptions, executor event processing success/failure, and zombie detection. |
airflow-core/tests/unit/jobs/test_scheduler_job.py |
Updates existing executor-event assertions and adds a new test class covering the new metrics. |
| assert all( | ||
| c.args[0] == "scheduler.executor_events.processed" | ||
| for c in mock_stats.incr.call_args_list | ||
| ) |
| if to_reset: | ||
| stats.incr( | ||
| "scheduler.zombies.detected", | ||
| len(to_reset), | ||
| tags={"reason": "adopt_failure"}, | ||
| ) | ||
|
|
||
| if to_reset: | ||
| task_instance_str = "\n\t".join(reset_tis_message) |
| session.commit() | ||
|
|
||
| scheduler_job = Job() | ||
| self.job_runner = SchedulerJobRunner(job=scheduler_job) |
|
@safaehar A few things need addressing before review — see our Pull Request quality criteria.
No rush. Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you. |
|
Thanks @potiuk, addressed the failing static checks in ab239f6:
All three hooks ( |
| if to_reset: | ||
| stats.incr( | ||
| "scheduler.zombies.detected", | ||
| len(to_reset), | ||
| tags={"reason": "adopt_failure"}, | ||
| ) | ||
|
|
||
| if to_reset: | ||
| task_instance_str = "\n\t".join(reset_tis_message) |
There was a problem hiding this comment.
these can be merged in a single block
| return len(event_buffer) | ||
| stats.gauge("scheduler.executor_events.batch_size", num_events) | ||
| stats.incr("scheduler.executor_events.processed", num_events) | ||
| return num_events |
There was a problem hiding this comment.
This changes the behavior of the existing method: event_buffer.pop(...) shrinks the buffer as events are processed, so the original return len(event_buffer) returns the remaining count, not the total. That value feeds num_finished_events and the scheduler's idle detection, so let's keep the behavior unchanged here, it might be a pre-existing bug, but I'll deep-dive and fix it in a separate PR if needed.
Keep num_events for the metric emission, but return len(event_buffer) in all three return statements (the two early returns and the final one).
Emit three new metrics from SchedulerJobRunner to improve visibility
into scheduler health:
- scheduler.loop_exceptions{exception_class}: counter incremented when
the scheduler loop exits with an unhandled exception, tagged with the
exception class to aid triaging crash loops.
- scheduler.executor_events.batch_size (gauge) and
scheduler.executor_events.processed (counter): emitted on each
successful call to process_executor_events with the event count.
scheduler.executor_events.failed{reason} is incremented instead when
the call raises, tagged with the exception class.
The original body is extracted into _process_executor_events_core so
process_executor_events can act as a thin metrics wrapper.
- scheduler.zombies.detected{reason}: counter incremented when zombie
task instances are detected, tagged with the detection path:
- heartbeat_timeout: task exceeded the heartbeat threshold
- adopt_failure: orphaned task could not be re-adopted and was reset
These metrics are already running in production via local monkey-patches
at Datadog; this commit contributes them natively upstream so the
patches can eventually be removed.
… metric Three existing tests used mock_stats.incr.assert_not_called() to verify that no error/mismatch metrics were emitted in requeued-TI and stale-success scenarios. The new executor_events.processed counter now fires unconditionally for every _process_executor_events call, so those blanket assertions became too broad. Replace each with an assertion that permits the expected counter while still verifying that no scheduler.tasks.killed_externally metric fired.
79f3197 to
18648f9
Compare
Emit three new metrics from `SchedulerJobRunner` to improve visibility into scheduler health. These metrics have been running in production at Datadog via local monkey-patches and are contributed upstream so the patches can eventually be removed.
New metrics
`scheduler.loop_exceptions{exception_class}` — counter incremented when the scheduler loop exits with an unhandled exception, tagged with the exception class. Emitted from the existing `except Exception` block in `_execute`.
`scheduler.executor_events.batch_size` (gauge) and `scheduler.executor_events.processed` (counter) — emitted on every successful call to `process_executor_events` with the size of the event batch. `scheduler.executor_events.failed{reason}` is incremented when the call raises, tagged with the exception class.
`scheduler.zombies.detected{reason}` — counter incremented when zombie task instances are detected, tagged by detection path:
Tests
New test class `TestSchedulerObservabilityMetrics` in `airflow-core/tests/unit/jobs/test_scheduler_job.py` covers all three metrics (success and failure paths):
Was generative AI tooling used to co-author this PR?