Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1223,12 +1223,16 @@ def _is_tracing_enabled():
return conf.getboolean("traces", "otel_on")

def _process_executor_events(self, executor: BaseExecutor, session: Session) -> int:
return SchedulerJobRunner.process_executor_events(
executor=executor,
job_id=self.job.id,
scheduler_dag_bag=self.scheduler_dag_bag,
session=session,
)
try:
return SchedulerJobRunner.process_executor_events(
executor=executor,
job_id=self.job.id,
scheduler_dag_bag=self.scheduler_dag_bag,
session=session,
)
except Exception as exc:
stats.incr("scheduler.executor_events.failed", tags={"reason": type(exc).__name__})
raise
Comment on lines +1233 to +1235

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you use two different tags for the same data between scheduler.executor_events.failed and scheduler.loop_exceptions?


@classmethod
def process_executor_events(
Expand Down Expand Up @@ -1266,6 +1270,7 @@ def process_executor_events(
"""
ti_primary_key_to_try_number_map: dict[tuple[str, str, str, int], int] = {}
event_buffer = executor.get_event_buffer()
num_events = len(event_buffer)
tis_with_right_state: list[TaskInstanceKey] = []
callback_keys_with_events: list[CallbackKey] = []

Expand Down Expand Up @@ -1327,11 +1332,15 @@ def process_executor_events(

# Return if no finished tasks
if not tis_with_right_state:
stats.gauge("scheduler.executor_events.batch_size", num_events)
stats.incr("scheduler.executor_events.processed", num_events)
return len(event_buffer)

# Check state of finished tasks
filter_for_tis = TI.filter_for_tis(tis_with_right_state)
if filter_for_tis is None:
stats.gauge("scheduler.executor_events.batch_size", num_events)
stats.incr("scheduler.executor_events.processed", num_events)
return len(event_buffer)
asset_loader, alias_loader = _eager_load_dag_run_for_validation()
query = (
Expand Down Expand Up @@ -1548,6 +1557,8 @@ def process_executor_events(
# Update task state - emails are handled by DAG processor now
ti.handle_failure(error=msg, session=session)

stats.gauge("scheduler.executor_events.batch_size", num_events)
stats.incr("scheduler.executor_events.processed", num_events)
return len(event_buffer)

def _execute(self) -> int | None:
Expand Down Expand Up @@ -1583,7 +1594,8 @@ def _execute(self) -> int | None:

if settings.Session is not None:
settings.Session.remove()
except Exception:
except Exception as exc:
stats.incr("scheduler.loop_exceptions", tags={"exception_class": type(exc).__name__})
self.log.exception("Exception when executing SchedulerJob._run_scheduler_loop")
raise
finally:
Expand Down Expand Up @@ -3221,8 +3233,12 @@ def adopt_or_reset_orphaned_tasks(self, *, session: Session = NEW_SESSION) -> in

stats.incr("scheduler.orphaned_tasks.cleared", len(to_reset))
stats.incr("scheduler.orphaned_tasks.adopted", len(tis_to_adopt_or_reset) - len(to_reset))

if to_reset:
stats.incr(
"scheduler.zombies.detected",
len(to_reset),
tags={"reason": "adopt_failure"},
)
Comment on lines +3237 to +3241

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe there is a distinction between a zombie task (a task that remains stuck in a running state even though its associated job is inactive) and an orphaned task (a task that has lost its executor). This metric appears to track orphaned tasks that the executor failed to adopt for any reason.

Since this overlaps with scheduler.orphaned_tasks.cleared, I would prefer to remove the new metric.

task_instance_str = "\n\t".join(reset_tis_message)
Comment on lines +3236 to 3242
Comment on lines +3236 to 3242

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these can be merged in a single block

@safaehar safaehar Jun 11, 2026

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 79f3197

self.log.info(
"Reset the following %s orphaned TaskInstances:\n\t%s",
Expand Down Expand Up @@ -3369,6 +3385,11 @@ def _find_and_purge_task_instances_without_heartbeats(self) -> None:
if task_instances_without_heartbeats := self._find_task_instances_without_heartbeats(
session=session
):
stats.incr(
"scheduler.zombies.detected",
len(task_instances_without_heartbeats),
tags={"reason": "heartbeat_timeout"},
)
self._purge_task_instances_without_heartbeats(
task_instances_without_heartbeats, session=session
)
Expand Down
166 changes: 163 additions & 3 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,9 @@ def test_process_executor_events_ti_requeued(
ti1.refresh_from_db(session=session)
assert ti1.state == State.QUEUED
self.job_runner.executor.callback_sink.send.assert_not_called()
mock_stats.incr.assert_not_called()
# Only the processed-events counter should have fired across all three sub-tests;
# no killed_externally mismatch metric should appear.
assert all(c.args[0] == "scheduler.executor_events.processed" for c in mock_stats.incr.call_args_list)

@mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
@mock.patch("airflow._shared.observability.metrics.stats._get_backend")
Expand Down Expand Up @@ -905,7 +907,9 @@ def test_process_executor_events_stale_success_when_scheduled_after_defer(
ti1.refresh_from_db(session=session)
assert ti1.state == State.SCHEDULED
self.job_runner.executor.callback_sink.send.assert_not_called()
mock_stats.incr.assert_not_called()
# Stale success from defer exit must not trigger a mismatch metric —
# only the standard processed-events counter should fire.
mock_stats.incr.assert_called_once_with("scheduler.executor_events.processed", count=1)

# Without next_method, scheduled + stale success is still a mismatch (e.g. external kill).
ti1.next_method = None
Expand Down Expand Up @@ -957,7 +961,9 @@ def test_process_executor_events_multiple_try_numbers_warns(
for rec in caplog.records
)
mock_task_callback.assert_not_called()
mock_stats.incr.assert_not_called()
# Only the processed-events counter should fire; duplicate try_number events
# must not trigger any error/mismatch metrics.
mock_stats.incr.assert_called_once_with("scheduler.executor_events.processed", count=2)

@pytest.mark.usefixtures("testing_dag_bundle")
def test_process_executor_events_with_asset_events(self, session, dag_maker):
Expand Down Expand Up @@ -11942,3 +11948,157 @@ def test_resolve_partition_date(mappers, partition_key, expected):
dag_id="test-dag",
)
assert result == expected


class TestSchedulerObservabilityMetrics:
"""Tests for the scheduler observability metrics emitted in scheduler_job_runner.py."""

@pytest.fixture(autouse=True)
def per_test(self) -> Generator:
_clean_db()
self.job_runner: SchedulerJobRunner | None = None
yield
_clean_db()

# --- scheduler.loop_exceptions ---

def test_loop_exceptions_incr_on_scheduler_loop_failure(self):
"""scheduler.loop_exceptions is emitted with exception_class tag when _run_scheduler_loop raises."""
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job)

with (
mock.patch("airflow.jobs.scheduler_job_runner.stats") as mock_stats,
mock.patch.object(self.job_runner, "register_signals", return_value=MagicMock()),
mock.patch.object(self.job_runner.executor, "start"),
mock.patch.object(self.job_runner.executor, "end"),
mock.patch.object(self.job_runner, "_run_scheduler_loop", side_effect=RuntimeError("loop crash")),
pytest.raises(RuntimeError, match="loop crash"),
):
self.job_runner._execute()

mock_stats.incr.assert_any_call("scheduler.loop_exceptions", tags={"exception_class": "RuntimeError"})

def test_loop_exceptions_not_emitted_on_clean_exit(self):
"""scheduler.loop_exceptions is NOT emitted when _run_scheduler_loop returns normally."""
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job)

with (
mock.patch("airflow.jobs.scheduler_job_runner.stats") as mock_stats,
mock.patch.object(self.job_runner, "register_signals", return_value=MagicMock()),
mock.patch.object(self.job_runner.executor, "start"),
mock.patch.object(self.job_runner.executor, "end"),
mock.patch.object(self.job_runner, "_run_scheduler_loop"),
):
self.job_runner._execute()

emitted_names = [c.args[0] for c in mock_stats.incr.call_args_list]
assert "scheduler.loop_exceptions" not in emitted_names

# --- scheduler.executor_events.{batch_size,processed,failed} ---

def test_executor_events_batch_metrics_emitted_on_success(self):
"""batch_size gauge and processed counter are emitted via the early-return path."""
# Empty event buffer → tis_with_right_state is empty → early return with num_events=0
mock_executor = MagicMock()
mock_executor.get_event_buffer.return_value = {}

with mock.patch("airflow.jobs.scheduler_job_runner.stats") as mock_stats:
result = SchedulerJobRunner.process_executor_events(
executor=mock_executor, job_id=1, scheduler_dag_bag=MagicMock(), session=MagicMock()
)

assert result == 0
mock_stats.gauge.assert_called_once_with("scheduler.executor_events.batch_size", 0)
mock_stats.incr.assert_called_once_with("scheduler.executor_events.processed", 0)

def test_executor_events_failed_metric_emitted_on_exception(self):
"""failed counter is emitted in _process_executor_events when process_executor_events raises."""
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job)

with (
mock.patch("airflow.jobs.scheduler_job_runner.stats") as mock_stats,
mock.patch.object(
SchedulerJobRunner, "process_executor_events", side_effect=ValueError("executor boom")
),
pytest.raises(ValueError, match="executor boom"),
):
self.job_runner._process_executor_events(executor=MagicMock(), session=MagicMock())

mock_stats.incr.assert_called_once_with(
"scheduler.executor_events.failed", tags={"reason": "ValueError"}
)
mock_stats.gauge.assert_not_called()

# --- scheduler.zombies.detected ---

def test_zombies_detected_heartbeat_timeout_emitted(self):
"""scheduler.zombies.detected{reason:heartbeat_timeout} is emitted when zombies are found."""
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job)
fake_tis = [MagicMock(), MagicMock(), MagicMock()]

mock_session = MagicMock()
mock_ctx = MagicMock()
mock_ctx.__enter__ = MagicMock(return_value=mock_session)
mock_ctx.__exit__ = MagicMock(return_value=False)

with (
mock.patch("airflow.jobs.scheduler_job_runner.stats") as mock_stats,
mock.patch("airflow.jobs.scheduler_job_runner.create_session", return_value=mock_ctx),
mock.patch.object(
self.job_runner, "_find_task_instances_without_heartbeats", return_value=fake_tis
),
mock.patch.object(self.job_runner, "_purge_task_instances_without_heartbeats"),
):
self.job_runner._find_and_purge_task_instances_without_heartbeats()

mock_stats.incr.assert_called_once_with(
"scheduler.zombies.detected", 3, tags={"reason": "heartbeat_timeout"}
)

def test_zombies_detected_not_emitted_when_no_heartbeat_timeout(self):
"""scheduler.zombies.detected is NOT emitted when no zombie task instances are found."""
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job)

mock_session = MagicMock()
mock_ctx = MagicMock()
mock_ctx.__enter__ = MagicMock(return_value=mock_session)
mock_ctx.__exit__ = MagicMock(return_value=False)

with (
mock.patch("airflow.jobs.scheduler_job_runner.stats") as mock_stats,
mock.patch("airflow.jobs.scheduler_job_runner.create_session", return_value=mock_ctx),
mock.patch.object(self.job_runner, "_find_task_instances_without_heartbeats", return_value=[]),
):
self.job_runner._find_and_purge_task_instances_without_heartbeats()

mock_stats.incr.assert_not_called()

def test_zombies_detected_adopt_failure_emitted(self, dag_maker, session):
"""scheduler.zombies.detected{reason:adopt_failure} is emitted for tasks that can't be adopted."""
with dag_maker(dag_id="test_zombie_adopt_failure", schedule="@daily"):
EmptyOperator(task_id="task1")

old_job = Job()
session.add(old_job)
session.commit()

scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job)

dr = dag_maker.create_dagrun(run_type=DagRunType.MANUAL)
ti = dr.get_task_instances(session=session)[0]
ti.state = TaskInstanceState.QUEUED
ti.queued_by_job_id = old_job.id
session.merge(ti)
session.commit()

with mock.patch("airflow.jobs.scheduler_job_runner.stats") as mock_stats:
num_reset = self.job_runner.adopt_or_reset_orphaned_tasks(session=session)

assert num_reset == 1
mock_stats.incr.assert_any_call("scheduler.zombies.detected", 1, tags={"reason": "adopt_failure"})
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,33 @@ metrics:
legacy_name: "-"
name_variables: []

- name: "scheduler.loop_exceptions"
description: "Number of times the scheduler loop exited with an unhandled exception.
Metric with exception_class tagging."
type: "counter"
legacy_name: "-"
name_variables: []

- name: "scheduler.executor_events.processed"
description: "Number of executor events processed per ``process_executor_events`` call."
type: "counter"
legacy_name: "-"
name_variables: []

- name: "scheduler.executor_events.failed"
description: "Number of times ``process_executor_events`` raised an exception.
Metric with reason tagging."
type: "counter"
legacy_name: "-"
name_variables: []

- name: "scheduler.zombies.detected"
description: "Number of zombie task instances detected by the Scheduler.
Metric with reason tagging (``heartbeat_timeout`` or ``adopt_failure``)."
type: "counter"
legacy_name: "-"
name_variables: []

- name: "scheduler.critical_section_busy"
description: "Count of times a scheduler process tried to get a lock on the critical
section (needed to send tasks to the executor) and found it locked by another process."
Expand Down Expand Up @@ -394,6 +421,12 @@ metrics:
legacy_name: "-"
name_variables: []

- name: "scheduler.executor_events.batch_size"
description: "Number of executor events in the batch processed per ``process_executor_events`` call."
type: "gauge"
legacy_name: "-"
name_variables: []

- name: "scheduler.tasks.executable"
description: "Number of tasks that are ready for execution (set to queued) with respect to pool limits,
Dag concurrency, executor state, and priority."
Expand Down
Loading