|
6 | 6 | from contextlib import contextmanager |
7 | 7 | from dataclasses import asdict, is_dataclass |
8 | 8 | from pathlib import Path |
9 | | -from tempfile import NamedTemporaryFile, TemporaryDirectory |
| 9 | +from tempfile import TemporaryDirectory |
10 | 10 | from typing import Any, Iterator, Optional, Union |
11 | 11 | from urllib.parse import urlparse |
12 | 12 |
|
@@ -241,23 +241,18 @@ def dbt_s3_hook(self): |
241 | 241 | def override_dbt_logging(self, dbt_directory: str = None): |
242 | 242 | """Override dbt's logger. |
243 | 243 |
|
244 | | - dbt logger writes to STDOUT and I haven't found a way |
245 | | - to bubble up to the Airflow command logger. As a workaround, |
246 | | - I set the output stream to a temporary file that is later |
247 | | - read and logged using the command's logger. |
| 244 | + We override the output stream of the dbt logger to use Airflow's StreamLogWriter |
| 245 | + so that we can ensure dbt logs properly to the Airflow task's log output. |
248 | 246 | """ |
249 | | - with NamedTemporaryFile(dir=dbt_directory, mode="w+") as f: |
250 | | - with log_manager.applicationbound(): |
| 247 | + from airflow.utils.log.logging_mixin import StreamLogWriter |
251 | 248 |
|
252 | | - log_manager.reset_handlers() |
253 | | - log_manager.set_path(dbt_directory) |
254 | | - log_manager.set_output_stream(f) |
255 | | - |
256 | | - yield |
257 | | - |
258 | | - with open(f.name) as read_file: |
259 | | - for line in read_file: |
260 | | - self.log.info(line.rstrip()) |
| 249 | + with log_manager.applicationbound(): |
| 250 | + log_manager.reset_handlers() |
| 251 | + log_manager.set_path(dbt_directory) |
| 252 | + log_manager.set_output_stream( |
| 253 | + StreamLogWriter(self.log, self.log.getEffectiveLevel()) |
| 254 | + ) |
| 255 | + yield |
261 | 256 |
|
262 | 257 | def run_dbt_command(self, args: list[Optional[str]]) -> tuple[RunResult, bool]: |
263 | 258 | """Run a dbt command as implemented by a subclass.""" |
|
0 commit comments