Skip to content

Commit 6ccd803

Browse files
authored
feat: Support for dbt 0.21 (#10)
* feat: Add support for new dbt build command and arguments * feat: Support select argument for run, test, and compile operators dbt 0.21 added the select flag to the run, test, and compile commands with the intention for it to replace models and bring them in line with the rest of the dbt commands. We have added support for a support argument but kept backwards compatibility with models as we intend to support dbt versions >= 0.19 (at least for the moment).
1 parent 515265a commit 6ccd803

File tree

9 files changed

+542
-32
lines changed

9 files changed

+542
-32
lines changed

airflow_dbt_python/operators/dbt.py

Lines changed: 91 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,18 @@
1414
from dbt.contracts.results import RunExecutionResult, RunResult, agate
1515
from dbt.logger import log_manager
1616
from dbt.main import initialize_config_values, parse_args, track_run
17+
from dbt.semver import VersionSpecifier
18+
from dbt.version import installed as installed_version
1719

1820
from airflow import AirflowException
1921
from airflow.models.baseoperator import BaseOperator
2022
from airflow.models.xcom import XCOM_RETURN_KEY
2123
from airflow.utils.decorators import apply_defaults
2224

25+
IS_DBT_VERSION_LESS_THAN_0_21 = (
26+
int(installed_version.minor) < 21 and int(installed_version.major) == 0
27+
)
28+
2329

2430
class DbtBaseOperator(BaseOperator):
2531
"""The basic Airflow dbt operator.
@@ -295,6 +301,7 @@ class DbtRunOperator(DbtBaseOperator):
295301
__dbt_args__ = DbtBaseOperator.__dbt_args__ + [
296302
"full_refresh",
297303
"models",
304+
"select",
298305
"fail_fast",
299306
"threads",
300307
"exclude",
@@ -308,6 +315,7 @@ def __init__(
308315
self,
309316
full_refresh: Optional[bool] = None,
310317
models: Optional[list[str]] = None,
318+
select: Optional[list[str]] = None,
311319
fail_fast: Optional[bool] = None,
312320
threads: Optional[int] = None,
313321
exclude: Optional[list[str]] = None,
@@ -319,7 +327,6 @@ def __init__(
319327
) -> None:
320328
super().__init__(**kwargs)
321329
self.full_refresh = full_refresh
322-
self.models = models
323330
self.fail_fast = fail_fast
324331
self.threads = threads
325332
self.exclude = exclude
@@ -328,6 +335,11 @@ def __init__(
328335
self.defer = defer
329336
self.no_defer = no_defer
330337

338+
if IS_DBT_VERSION_LESS_THAN_0_21:
339+
self.models = models or select
340+
else:
341+
self.select = select or models
342+
331343

332344
class DbtSeedOperator(DbtBaseOperator):
333345
"""Executes dbt seed."""
@@ -375,6 +387,7 @@ class DbtTestOperator(DbtBaseOperator):
375387
"schema",
376388
"fail_fast",
377389
"models",
390+
"select",
378391
"threads",
379392
"exclude",
380393
"selector",
@@ -388,6 +401,7 @@ def __init__(
388401
data: Optional[bool] = None,
389402
schema: Optional[bool] = None,
390403
models: Optional[list[str]] = None,
404+
select: Optional[list[str]] = None,
391405
fail_fast: Optional[bool] = None,
392406
threads: Optional[int] = None,
393407
exclude: Optional[list[str]] = None,
@@ -400,7 +414,6 @@ def __init__(
400414
super().__init__(**kwargs)
401415
self.data = data
402416
self.schema = schema
403-
self.models = models
404417
self.fail_fast = fail_fast
405418
self.threads = threads
406419
self.exclude = exclude
@@ -409,6 +422,11 @@ def __init__(
409422
self.defer = defer
410423
self.no_defer = no_defer
411424

425+
if IS_DBT_VERSION_LESS_THAN_0_21:
426+
self.models = models or select
427+
else:
428+
self.select = select or models
429+
412430

413431
class DbtCompileOperator(DbtBaseOperator):
414432
"""Executes dbt compile."""
@@ -421,6 +439,7 @@ class DbtCompileOperator(DbtBaseOperator):
421439
"fail_fast",
422440
"threads",
423441
"models",
442+
"select",
424443
"exclude",
425444
"selector",
426445
"state",
@@ -431,6 +450,7 @@ def __init__(
431450
parse_only: Optional[bool] = None,
432451
full_refresh: Optional[bool] = None,
433452
models: Optional[list[str]] = None,
453+
select: Optional[list[str]] = None,
434454
fail_fast: Optional[bool] = None,
435455
threads: Optional[int] = None,
436456
exclude: Optional[list[str]] = None,
@@ -441,13 +461,17 @@ def __init__(
441461
super().__init__(**kwargs)
442462
self.parse_only = parse_only
443463
self.full_refresh = full_refresh
444-
self.models = models
445464
self.fail_fast = fail_fast
446465
self.threads = threads
447466
self.exclude = exclude
448467
self.selector = selector
449468
self.state = state
450469

470+
if IS_DBT_VERSION_LESS_THAN_0_21:
471+
self.models = models or select
472+
else:
473+
self.select = select or models
474+
451475

452476
class DbtDepsOperator(DbtBaseOperator):
453477
"""Executes dbt deps."""
@@ -523,29 +547,29 @@ class DbtLsOperator(DbtBaseOperator):
523547
__dbt_args__ = DbtBaseOperator.__dbt_args__ + [
524548
"resource_type",
525549
"select",
526-
"models",
527550
"exclude",
528551
"selector",
529552
"dbt_output",
553+
"output_keys",
530554
]
531555

532556
def __init__(
533557
self,
534558
resource_type: Optional[list[str]] = None,
535559
select: Optional[list[str]] = None,
536-
models: Optional[list[str]] = None,
537560
exclude: Optional[list[str]] = None,
538561
selector: Optional[str] = None,
539562
dbt_output: Optional[str] = None,
563+
output_keys: Optional[list[str]] = None,
540564
**kwargs,
541565
) -> None:
542566
super().__init__(**kwargs)
543567
self.resource_type = resource_type
544568
self.select = select
545-
self.models = models
546569
self.exclude = exclude
547570
self.selector = selector
548571
self.dbt_output = dbt_output
572+
self.output_keys = output_keys
549573

550574

551575
# Convinience alias
@@ -590,16 +614,19 @@ class DbtSourceOperator(DbtBaseOperator):
590614

591615
__dbt_args__ = DbtBaseOperator.__dbt_args__ + [
592616
"select",
593-
"models",
617+
"threads",
594618
"exclude",
595619
"selector",
620+
"state",
596621
"dbt_output",
597622
]
598623

599624
def __init__(
600625
self,
601626
# Only one subcommand is currently provided
602-
subcommand: str = "snapshot-freshness",
627+
subcommand: str = "freshness"
628+
if not installed_version < VersionSpecifier.from_version_string("0.21.0")
629+
else "snapshot-freshness",
603630
select: Optional[list[str]] = None,
604631
dbt_output: Optional[Union[str, Path]] = None,
605632
threads: Optional[int] = None,
@@ -610,11 +637,67 @@ def __init__(
610637
) -> None:
611638
super().__init__(positional_args=[subcommand], **kwargs)
612639
self.select = select
640+
self.threads = threads
641+
self.exclude = exclude
642+
self.selector = selector
643+
self.state = state
613644
self.dbt_output = dbt_output
645+
646+
647+
class DbtBuildOperator(DbtBaseOperator):
648+
"""Execute dbt build.
649+
650+
The build command combines the run, test, seed, and snapshot commands into one. The
651+
full Documentation for the dbt build command can be found here:
652+
https://docs.getdbt.com/reference/commands/build.
653+
"""
654+
655+
command = "build"
656+
657+
__dbt_args__ = DbtBaseOperator.__dbt_args__ + [
658+
"full_refresh",
659+
"select",
660+
"fail_fast",
661+
"threads",
662+
"exclude",
663+
"selector",
664+
"state",
665+
"defer",
666+
"no_defer",
667+
"data",
668+
"schema",
669+
"show",
670+
]
671+
672+
def __init__(
673+
self,
674+
full_refresh: Optional[bool] = None,
675+
select: Optional[list[str]] = None,
676+
fail_fast: Optional[bool] = None,
677+
threads: Optional[int] = None,
678+
exclude: Optional[list[str]] = None,
679+
selector: Optional[str] = None,
680+
state: Optional[Union[str, Path]] = None,
681+
defer: Optional[bool] = None,
682+
no_defer: Optional[bool] = None,
683+
data: Optional[bool] = None,
684+
schema: Optional[bool] = None,
685+
show: Optional[bool] = None,
686+
**kwargs,
687+
) -> None:
688+
super().__init__(**kwargs)
689+
self.full_refresh = full_refresh
690+
self.select = select
691+
self.fail_fast = fail_fast
614692
self.threads = threads
615693
self.exclude = exclude
616694
self.selector = selector
617695
self.state = state
696+
self.defer = defer
697+
self.no_defer = no_defer
698+
self.data = data
699+
self.schema = schema
700+
self.show = show
618701

619702

620703
def run_result_factory(data: list[tuple[Any, Any]]):

tests/conftest.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,3 +228,22 @@ def s3_bucket(mocked_s3_res):
228228
bucket = "airflow-dbt-test-s3-bucket"
229229
mocked_s3_res.create_bucket(Bucket=bucket)
230230
return bucket
231+
232+
233+
BROKEN_SQL = """
234+
SELECT
235+
field1 AS field1
236+
FROM
237+
non_existent_table
238+
WHERE
239+
field1 > 1
240+
"""
241+
242+
243+
@pytest.fixture
244+
def broken_file(dbt_project_dir):
245+
d = dbt_project_dir / "models"
246+
m = d / "broken.sql"
247+
m.write_text(BROKEN_SQL)
248+
yield m
249+
m.unlink()

0 commit comments

Comments
 (0)