From 0c3696af896536f1a6e6faced5368bff3807cffb Mon Sep 17 00:00:00 2001 From: Nick Stenning Date: Fri, 20 Feb 2026 15:18:33 +0100 Subject: [PATCH] Fix broken `dag_processing.total_parse_time` metric DagFileProcessorManager has been emitting a nonsense value for `dag_processing.total_parse_time` since 8774f28d76, which reversed the order in which `emit_metrics` and `prepare_file_queue` (then called `prepare_file_path_queue`) were called. As `prepare_file_path_queue` was responsible for resetting the value of `self._parsing_start_time`, the assumption made by `emit_metrics` was that it would be called once the file queue had been cleared, but crucially before `prepare_file_queue` was called to refill the queue. Additionally, there was no guarantee that we'd parsed any files at all since the last time the metric was emitted. If no work was due, we'd gladly emit near-zero metrics every time around the while loop. I've rearranged things in such a way that I hope will be harder to accidentally break in future: - `self._parsing_start_time` may be reset whenever files are added to the queue, if it was not set already. - metrics are emitted when `prepare_file_queue` is called -- when the queue is empty -- but only if `self._parsing_start_time` is set, meaning only if we've actually parsed any files since the last time metrics were emitted. Together, this means we should now emit metrics once per parsing loop. I've added a test which fails on main and passes on this branch. --- .../src/airflow/dag_processing/manager.py | 101 +++++++++++------- airflow-core/src/airflow/typing_compat.py | 6 +- .../tests/unit/dag_processing/test_manager.py | 31 +++++- 3 files changed, 93 insertions(+), 45 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index c0dff72880602..ba4715a8ae7a7 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -36,7 +36,7 @@ from datetime import datetime, timedelta from operator import attrgetter, itemgetter from pathlib import Path -from typing import TYPE_CHECKING, Any, NamedTuple, cast +from typing import TYPE_CHECKING, Any, Literal, NamedTuple, cast import attrs import structlog @@ -65,6 +65,7 @@ from airflow.observability.trace import DebugTrace from airflow.sdk import SecretCache from airflow.sdk.log import init_log_file, logging_processors +from airflow.typing_compat import assert_never from airflow.utils.file import list_py_file_paths, might_contain_dag from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.net import get_hostname @@ -208,7 +209,7 @@ class DagFileProcessorManager(LoggingMixin): _processors: dict[DagFileInfo, DagFileProcessorProcess] = attrs.field(factory=dict, init=False) - _parsing_start_time: float = attrs.field(init=False) + _parsing_start_time: float | None = attrs.field(default=None, init=False) _num_run: int = attrs.field(default=0, init=False) _callback_to_execute: dict[DagFileInfo, list[CallbackRequest]] = attrs.field( @@ -395,7 +396,6 @@ def _run_parsing_loop(self): # clear down, we must have cleared all files found from scanning the dags dir _and_ have # cleared all files added as a result of callbacks self.prepare_file_queue(known_files=known_files) - self.emit_metrics() self._start_new_processes() @@ -453,16 +453,8 @@ def _service_processor_sockets(self, timeout: float | None = 1.0): def _queue_requested_files_for_parsing(self) -> None: """Queue any files requested for parsing as requested by users via UI/API.""" files = self._get_priority_files() - bundles_to_refresh: set[str] = set() - for file in files: - # Try removing the file if already present - with contextlib.suppress(ValueError): - self._file_queue.remove(file) - # enqueue file to the start of the queue. - self._file_queue.appendleft(file) - bundles_to_refresh.add(file.bundle_name) - - self._force_refresh_bundles |= bundles_to_refresh + self._add_files_to_queue(files, mode="frontprio") + self._force_refresh_bundles |= {file.bundle_name for file in files} if self._force_refresh_bundles: self.log.info("Bundles being force refreshed: %s", ", ".join(self._force_refresh_bundles)) @@ -544,7 +536,7 @@ def _add_callback_to_queue(self, request: CallbackRequest): bundle_version=request.bundle_version, ) self._callback_to_execute[file_info].append(request) - self._add_files_to_queue([file_info], True) + self._add_files_to_queue([file_info], mode="front") Stats.incr("dag_processing.other_callback_count") def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]): @@ -1021,7 +1013,7 @@ def _add_new_files_to_queue(self, known_files: dict[str, set[DagFileInfo]]): if new_files: self.log.info("Adding %d new files to the front of the queue", len(new_files)) - self._add_files_to_queue(new_files, True) + self._add_files_to_queue(new_files, mode="front") def _resort_file_queue(self): if self._file_parsing_sort_mode == "modified_time" and self._file_queue: @@ -1076,7 +1068,15 @@ def prepare_file_queue(self, known_files: dict[str, set[DagFileInfo]]): Note this method is only called when the file path queue is empty """ - self._parsing_start_time = time.perf_counter() + # We only emit metrics after processing all files in the queue. If `self._parsing_start_time` is None + # when this method is called, no files have yet been added to the queue so we shouldn't emit metrics. + if self._parsing_start_time is not None: + emit_metrics( + parse_time=time.perf_counter() - self._parsing_start_time, + stats=list(self._file_stats.values()), + ) + self._parsing_start_time = None + # If the file path is already being processed, or if a file was # processed recently, wait until the next batch in_progress = set(self._processors) @@ -1122,7 +1122,7 @@ def prepare_file_queue(self, known_files: dict[str, set[DagFileInfo]]): "Queuing the following files for processing:\n\t%s", "\n\t".join(str(f.rel_path) for f in to_queue), ) - self._add_files_to_queue(to_queue, False) + self._add_files_to_queue(to_queue, mode="back") Stats.incr("dag_processing.file_path_queue_update_count") def _kill_timed_out_processors(self): @@ -1160,13 +1160,34 @@ def _kill_timed_out_processors(self): processor = self._processors.pop(proc) processor.logger_filehandle.close() - def _add_files_to_queue(self, files: list[DagFileInfo], add_at_front: bool): + def _add_files_to_queue( + self, + files: list[DagFileInfo], + *, + mode: Literal["front", "back", "frontprio"], + ): """Add stuff to the back or front of the file queue, unless it's already present.""" - new_files = list(f for f in files if f not in self._file_queue) - if add_at_front: + if mode == "frontprio": + for file in files: + # Try removing the file if already present + with contextlib.suppress(ValueError): + self._file_queue.remove(file) + # enqueue file to the start of the queue. + self._file_queue.appendleft(file) + elif mode == "front": + new_files = list(f for f in files if f not in self._file_queue) self._file_queue.extendleft(new_files) - else: + elif mode == "back": + new_files = list(f for f in files if f not in self._file_queue) self._file_queue.extend(new_files) + else: + assert_never(mode) + + # If we've just added files to the queue for the first time since metrics were last emitted, reset the + # parse time counter. + if self._parsing_start_time is None and self._file_queue: + self._parsing_start_time = time.perf_counter() + Stats.gauge("dag_processing.file_path_queue_size", len(self._file_queue)) def max_runs_reached(self): @@ -1193,27 +1214,25 @@ def end(self): if pids_to_kill: kill_child_processes_by_pids(pids_to_kill) - def emit_metrics(self): - """ - Emit metrics about dag parsing summary. - This is called once every time around the parsing "loop" - i.e. after - all files have been parsed. - """ - with DebugTrace.start_span(span_name="emit_metrics", component="DagFileProcessorManager") as span: - parse_time = time.perf_counter() - self._parsing_start_time - Stats.gauge("dag_processing.total_parse_time", parse_time) - Stats.gauge("dagbag_size", sum(stat.num_dags for stat in self._file_stats.values())) - Stats.gauge( - "dag_processing.import_errors", sum(stat.import_errors for stat in self._file_stats.values()) - ) - span.set_attributes( - { - "total_parse_time": parse_time, - "dag_bag_size": sum(stat.num_dags for stat in self._file_stats.values()), - "import_errors": sum(stat.import_errors for stat in self._file_stats.values()), - } - ) +def emit_metrics(*, parse_time: float, stats: Sequence[DagFileStat]): + """ + Emit metrics about dag parsing summary. + + This is called once every time around the parsing "loop" - i.e. after + all files have been parsed. + """ + with DebugTrace.start_span(span_name="emit_metrics", component="DagFileProcessorManager") as span: + Stats.gauge("dag_processing.total_parse_time", parse_time) + Stats.gauge("dagbag_size", sum(stat.num_dags for stat in stats)) + Stats.gauge("dag_processing.import_errors", sum(stat.import_errors for stat in stats)) + span.set_attributes( + { + "total_parse_time": parse_time, + "dag_bag_size": sum(stat.num_dags for stat in stats), + "import_errors": sum(stat.import_errors for stat in stats), + } + ) def process_parse_results( diff --git a/airflow-core/src/airflow/typing_compat.py b/airflow-core/src/airflow/typing_compat.py index e1efb87067b34..31b6e6ecaa5a0 100644 --- a/airflow-core/src/airflow/typing_compat.py +++ b/airflow-core/src/airflow/typing_compat.py @@ -19,7 +19,7 @@ from __future__ import annotations -__all__ = ["Literal", "ParamSpec", "Self", "TypeAlias", "TypeGuard", "Unpack"] +__all__ = ["Literal", "ParamSpec", "Self", "TypeAlias", "TypeGuard", "Unpack", "assert_never"] import sys @@ -27,6 +27,6 @@ from typing import Literal, ParamSpec, TypeAlias, TypeGuard if sys.version_info >= (3, 11): - from typing import Self, Unpack + from typing import Self, Unpack, assert_never else: - from typing_extensions import Self, Unpack + from typing_extensions import Self, Unpack, assert_never diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index dd8eb88cdb713..edfc3a505f01c 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -26,7 +26,7 @@ import signal import textwrap import time -from collections import deque +from collections import defaultdict, deque from datetime import datetime, timedelta from pathlib import Path from socket import socket, socketpair @@ -1591,3 +1591,32 @@ def test_stats_initialize_called_on_run(self, stats_init_mock, tmp_path, configu stats_init_mock.assert_called_once() call_kwargs = stats_init_mock.call_args.kwargs assert "factory" in call_kwargs + + @mock.patch("airflow.dag_processing.manager.Stats.gauge") + def test_stats_total_parse_time(self, statsd_gauge_mock, tmp_path, configure_testing_dag_bundle): + key = "dag_processing.total_parse_time" + gauge_values = defaultdict(list) + statsd_gauge_mock.side_effect = lambda name, value: gauge_values[name].append(value) + + dag_path = tmp_path / "temp_dag.py" + dag_code = textwrap.dedent( + """ + from airflow import DAG + dag = DAG(dag_id='temp_dag') + """ + ) + dag_path.write_text(dag_code) + + with configure_testing_dag_bundle(tmp_path): + manager = DagFileProcessorManager(max_runs=0) + + for _ in range(3): + manager.max_runs += 1 + manager.run() + + assert key in gauge_values + assert len(gauge_values[key]) == 1 + assert gauge_values[key][0] >= 1e-4 + + dag_path.touch() # make the loop run faster + gauge_values.clear()