Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 24 additions & 22 deletions airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,11 +393,16 @@ def _run_parsing_loop(self):
self._refresh_dag_bundles(known_files=known_files)

if not self._file_queue:
if self._parsing_start_time:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a simple test:

  1. First cycle with empty queue does not emit parse-time metric and does not crash,
  2. Subsequent empty-queue transition emits dag_processing.total_parse_time from the previous cycle window,
  3. _parsing_start_time reset semantics are correct.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self._parsing_start_time is defined with init=False:

_parsing_start_time: float = attrs.field(init=False)

So this will fail because it hasn't been assigned yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry, yes. It should be a hasattr or, more likely, we should update to have a default of None for this.

emit_metrics(
parse_time=time.perf_counter() - self._parsing_start_time,
stats=self._file_stats.values(),
)
self._parsing_start_time = time.perf_counter()
# Generate more file paths to process if we processed all the files already. Note for this to
# clear down, we must have cleared all files found from scanning the dags dir _and_ have
# cleared all files added as a result of callbacks
self.prepare_file_queue(known_files=known_files)
self.emit_metrics()

self._start_new_processes()

Expand Down Expand Up @@ -1078,7 +1083,6 @@ def prepare_file_queue(self, known_files: dict[str, set[DagFileInfo]]):

Note this method is only called when the file path queue is empty
"""
self._parsing_start_time = time.perf_counter()
# If the file path is already being processed, or if a file was
# processed recently, wait until the next batch
in_progress = set(self._processors)
Expand Down Expand Up @@ -1195,27 +1199,25 @@ def end(self):
if pids_to_kill:
kill_child_processes_by_pids(pids_to_kill)

def emit_metrics(self):
"""
Emit metrics about dag parsing summary.

This is called once every time around the parsing "loop" - i.e. after
all files have been parsed.
"""
with DebugTrace.start_span(span_name="emit_metrics", component="DagFileProcessorManager") as span:
parse_time = time.perf_counter() - self._parsing_start_time
Stats.gauge("dag_processing.total_parse_time", parse_time)
Stats.gauge("dagbag_size", sum(stat.num_dags for stat in self._file_stats.values()))
Stats.gauge(
"dag_processing.import_errors", sum(stat.import_errors for stat in self._file_stats.values())
)
span.set_attributes(
{
"total_parse_time": parse_time,
"dag_bag_size": sum(stat.num_dags for stat in self._file_stats.values()),
"import_errors": sum(stat.import_errors for stat in self._file_stats.values()),
}
)
def emit_metrics(*, parse_time: float, stats: Sequence[DagFileStat]):
"""
Emit metrics about dag parsing summary.

This is called once every time around the parsing "loop" - i.e. after
all files have been parsed.
"""
with DebugTrace.start_span(span_name="emit_metrics", component="DagFileProcessorManager") as span:
Stats.gauge("dag_processing.total_parse_time", parse_time)
Stats.gauge("dagbag_size", sum(stat.num_dags for stat in stats))
Stats.gauge("dag_processing.import_errors", sum(stat.import_errors for stat in stats))
span.set_attributes(
{
"total_parse_time": parse_time,
"dag_bag_size": sum(stat.num_dags for stat in stats),
"import_errors": sum(stat.import_errors for stat in stats),
}
)


def process_parse_results(
Expand Down
Loading