diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index c0dff72880602..ba4715a8ae7a7 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -36,7 +36,7 @@ from datetime import datetime, timedelta from operator import attrgetter, itemgetter from pathlib import Path -from typing import TYPE_CHECKING, Any, NamedTuple, cast +from typing import TYPE_CHECKING, Any, Literal, NamedTuple, cast import attrs import structlog @@ -65,6 +65,7 @@ from airflow.observability.trace import DebugTrace from airflow.sdk import SecretCache from airflow.sdk.log import init_log_file, logging_processors +from airflow.typing_compat import assert_never from airflow.utils.file import list_py_file_paths, might_contain_dag from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.net import get_hostname @@ -208,7 +209,7 @@ class DagFileProcessorManager(LoggingMixin): _processors: dict[DagFileInfo, DagFileProcessorProcess] = attrs.field(factory=dict, init=False) - _parsing_start_time: float = attrs.field(init=False) + _parsing_start_time: float | None = attrs.field(default=None, init=False) _num_run: int = attrs.field(default=0, init=False) _callback_to_execute: dict[DagFileInfo, list[CallbackRequest]] = attrs.field( @@ -395,7 +396,6 @@ def _run_parsing_loop(self): # clear down, we must have cleared all files found from scanning the dags dir _and_ have # cleared all files added as a result of callbacks self.prepare_file_queue(known_files=known_files) - self.emit_metrics() self._start_new_processes() @@ -453,16 +453,8 @@ def _service_processor_sockets(self, timeout: float | None = 1.0): def _queue_requested_files_for_parsing(self) -> None: """Queue any files requested for parsing as requested by users via UI/API.""" files = self._get_priority_files() - bundles_to_refresh: set[str] = set() - for file in files: - # Try removing the file if already present - with contextlib.suppress(ValueError): - self._file_queue.remove(file) - # enqueue file to the start of the queue. - self._file_queue.appendleft(file) - bundles_to_refresh.add(file.bundle_name) - - self._force_refresh_bundles |= bundles_to_refresh + self._add_files_to_queue(files, mode="frontprio") + self._force_refresh_bundles |= {file.bundle_name for file in files} if self._force_refresh_bundles: self.log.info("Bundles being force refreshed: %s", ", ".join(self._force_refresh_bundles)) @@ -544,7 +536,7 @@ def _add_callback_to_queue(self, request: CallbackRequest): bundle_version=request.bundle_version, ) self._callback_to_execute[file_info].append(request) - self._add_files_to_queue([file_info], True) + self._add_files_to_queue([file_info], mode="front") Stats.incr("dag_processing.other_callback_count") def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]): @@ -1021,7 +1013,7 @@ def _add_new_files_to_queue(self, known_files: dict[str, set[DagFileInfo]]): if new_files: self.log.info("Adding %d new files to the front of the queue", len(new_files)) - self._add_files_to_queue(new_files, True) + self._add_files_to_queue(new_files, mode="front") def _resort_file_queue(self): if self._file_parsing_sort_mode == "modified_time" and self._file_queue: @@ -1076,7 +1068,15 @@ def prepare_file_queue(self, known_files: dict[str, set[DagFileInfo]]): Note this method is only called when the file path queue is empty """ - self._parsing_start_time = time.perf_counter() + # We only emit metrics after processing all files in the queue. If `self._parsing_start_time` is None + # when this method is called, no files have yet been added to the queue so we shouldn't emit metrics. + if self._parsing_start_time is not None: + emit_metrics( + parse_time=time.perf_counter() - self._parsing_start_time, + stats=list(self._file_stats.values()), + ) + self._parsing_start_time = None + # If the file path is already being processed, or if a file was # processed recently, wait until the next batch in_progress = set(self._processors) @@ -1122,7 +1122,7 @@ def prepare_file_queue(self, known_files: dict[str, set[DagFileInfo]]): "Queuing the following files for processing:\n\t%s", "\n\t".join(str(f.rel_path) for f in to_queue), ) - self._add_files_to_queue(to_queue, False) + self._add_files_to_queue(to_queue, mode="back") Stats.incr("dag_processing.file_path_queue_update_count") def _kill_timed_out_processors(self): @@ -1160,13 +1160,34 @@ def _kill_timed_out_processors(self): processor = self._processors.pop(proc) processor.logger_filehandle.close() - def _add_files_to_queue(self, files: list[DagFileInfo], add_at_front: bool): + def _add_files_to_queue( + self, + files: list[DagFileInfo], + *, + mode: Literal["front", "back", "frontprio"], + ): """Add stuff to the back or front of the file queue, unless it's already present.""" - new_files = list(f for f in files if f not in self._file_queue) - if add_at_front: + if mode == "frontprio": + for file in files: + # Try removing the file if already present + with contextlib.suppress(ValueError): + self._file_queue.remove(file) + # enqueue file to the start of the queue. + self._file_queue.appendleft(file) + elif mode == "front": + new_files = list(f for f in files if f not in self._file_queue) self._file_queue.extendleft(new_files) - else: + elif mode == "back": + new_files = list(f for f in files if f not in self._file_queue) self._file_queue.extend(new_files) + else: + assert_never(mode) + + # If we've just added files to the queue for the first time since metrics were last emitted, reset the + # parse time counter. + if self._parsing_start_time is None and self._file_queue: + self._parsing_start_time = time.perf_counter() + Stats.gauge("dag_processing.file_path_queue_size", len(self._file_queue)) def max_runs_reached(self): @@ -1193,27 +1214,25 @@ def end(self): if pids_to_kill: kill_child_processes_by_pids(pids_to_kill) - def emit_metrics(self): - """ - Emit metrics about dag parsing summary. - This is called once every time around the parsing "loop" - i.e. after - all files have been parsed. - """ - with DebugTrace.start_span(span_name="emit_metrics", component="DagFileProcessorManager") as span: - parse_time = time.perf_counter() - self._parsing_start_time - Stats.gauge("dag_processing.total_parse_time", parse_time) - Stats.gauge("dagbag_size", sum(stat.num_dags for stat in self._file_stats.values())) - Stats.gauge( - "dag_processing.import_errors", sum(stat.import_errors for stat in self._file_stats.values()) - ) - span.set_attributes( - { - "total_parse_time": parse_time, - "dag_bag_size": sum(stat.num_dags for stat in self._file_stats.values()), - "import_errors": sum(stat.import_errors for stat in self._file_stats.values()), - } - ) +def emit_metrics(*, parse_time: float, stats: Sequence[DagFileStat]): + """ + Emit metrics about dag parsing summary. + + This is called once every time around the parsing "loop" - i.e. after + all files have been parsed. + """ + with DebugTrace.start_span(span_name="emit_metrics", component="DagFileProcessorManager") as span: + Stats.gauge("dag_processing.total_parse_time", parse_time) + Stats.gauge("dagbag_size", sum(stat.num_dags for stat in stats)) + Stats.gauge("dag_processing.import_errors", sum(stat.import_errors for stat in stats)) + span.set_attributes( + { + "total_parse_time": parse_time, + "dag_bag_size": sum(stat.num_dags for stat in stats), + "import_errors": sum(stat.import_errors for stat in stats), + } + ) def process_parse_results( diff --git a/airflow-core/src/airflow/typing_compat.py b/airflow-core/src/airflow/typing_compat.py index e1efb87067b34..31b6e6ecaa5a0 100644 --- a/airflow-core/src/airflow/typing_compat.py +++ b/airflow-core/src/airflow/typing_compat.py @@ -19,7 +19,7 @@ from __future__ import annotations -__all__ = ["Literal", "ParamSpec", "Self", "TypeAlias", "TypeGuard", "Unpack"] +__all__ = ["Literal", "ParamSpec", "Self", "TypeAlias", "TypeGuard", "Unpack", "assert_never"] import sys @@ -27,6 +27,6 @@ from typing import Literal, ParamSpec, TypeAlias, TypeGuard if sys.version_info >= (3, 11): - from typing import Self, Unpack + from typing import Self, Unpack, assert_never else: - from typing_extensions import Self, Unpack + from typing_extensions import Self, Unpack, assert_never diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index dd8eb88cdb713..edfc3a505f01c 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -26,7 +26,7 @@ import signal import textwrap import time -from collections import deque +from collections import defaultdict, deque from datetime import datetime, timedelta from pathlib import Path from socket import socket, socketpair @@ -1591,3 +1591,32 @@ def test_stats_initialize_called_on_run(self, stats_init_mock, tmp_path, configu stats_init_mock.assert_called_once() call_kwargs = stats_init_mock.call_args.kwargs assert "factory" in call_kwargs + + @mock.patch("airflow.dag_processing.manager.Stats.gauge") + def test_stats_total_parse_time(self, statsd_gauge_mock, tmp_path, configure_testing_dag_bundle): + key = "dag_processing.total_parse_time" + gauge_values = defaultdict(list) + statsd_gauge_mock.side_effect = lambda name, value: gauge_values[name].append(value) + + dag_path = tmp_path / "temp_dag.py" + dag_code = textwrap.dedent( + """ + from airflow import DAG + dag = DAG(dag_id='temp_dag') + """ + ) + dag_path.write_text(dag_code) + + with configure_testing_dag_bundle(tmp_path): + manager = DagFileProcessorManager(max_runs=0) + + for _ in range(3): + manager.max_runs += 1 + manager.run() + + assert key in gauge_values + assert len(gauge_values[key]) == 1 + assert gauge_values[key][0] >= 1e-4 + + dag_path.touch() # make the loop run faster + gauge_values.clear()