From 505d5eb4cb932f8e71cf0a41e39b8b5503a1b64e Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Fri, 20 Feb 2026 00:24:55 +0900 Subject: [PATCH] fix triggerer logger's file descriptor closed when it removed --- .../src/airflow/jobs/triggerer_job_runner.py | 15 +++++++++--- .../tests/unit/jobs/test_triggerer_job.py | 23 +++++++++++++++++++ 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index ba083b8fad3cb..c384fb725ec57 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -30,7 +30,7 @@ from datetime import datetime from socket import socket from traceback import format_exception -from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Literal, TypedDict +from typing import TYPE_CHECKING, Annotated, Any, BinaryIO, ClassVar, Literal, TextIO, TypedDict import anyio import attrs @@ -300,6 +300,8 @@ class TriggerLoggingFactory: bound_logger: WrappedLogger = attrs.field(init=False, repr=False) + _filehandle: TextIO | BinaryIO = attrs.field(init=False, repr=False) + def __call__(self, processors: Iterable[structlog.typing.Processor]) -> WrappedLogger: if hasattr(self, "bound_logger"): return self.bound_logger @@ -310,13 +312,20 @@ def __call__(self, processors: Iterable[structlog.typing.Processor]) -> WrappedL pretty_logs = False if pretty_logs: - underlying_logger: WrappedLogger = structlog.WriteLogger(log_file.open("w", buffering=1)) + self._filehandle = log_file.open("w", buffering=1) + underlying_logger: WrappedLogger = structlog.WriteLogger(self._filehandle) else: - underlying_logger = structlog.BytesLogger(log_file.open("wb")) + self._filehandle = log_file.open("wb") + underlying_logger = structlog.BytesLogger(self._filehandle) logger = structlog.wrap_logger(underlying_logger, processors=processors).bind() self.bound_logger = logger return logger + def __del__(self): + # Explicitly close the file descriptor when the logger is garbage collected. + if hasattr(self, "_filehandle") and self._filehandle: + self._filehandle.close() + def upload_to_remote(self): from airflow.sdk.log import upload_to_remote diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py b/airflow-core/tests/unit/jobs/test_triggerer_job.py index 7ba89562ad0c4..b3a59cef351f1 100644 --- a/airflow-core/tests/unit/jobs/test_triggerer_job.py +++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py @@ -303,6 +303,29 @@ def test_trigger_log(mock_monotonic, trigger, watcher_count, trigger_count, sess trigger_runner_supervisor.kill(force=False) +def test_trigger_logger_fd_closed_when_removed(session): + + trigger = TimeDeltaTrigger(datetime.timedelta(seconds=0.5)) + + create_trigger_in_db(session, trigger) + + mock_file = MagicMock() + mock_file.closed = False + + with patch("airflow.sdk.log.init_log_file") as mock_init_log_file: + mock_init_log_file.return_value.open.return_value = mock_file + + trigger_runner_supervisor = TriggerRunnerSupervisor.start(job=Job(id=123456), capacity=10) + trigger_runner_supervisor.load_triggers() + + for _ in range(30): + trigger_runner_supervisor._service_subprocess(0.1) + + mock_file.close.assert_called_once() + + trigger_runner_supervisor.kill(force=False) + + class TestTriggerRunner: def test_run_inline_trigger_canceled(self, session) -> None: trigger_runner = TriggerRunner()