Skip to content

Commit a881173

Browse files
committed
refactor: Use the new dbt hook for the dbt operators
Like mentioned in the previous commit, this simply replaces all the code used to prepare the CLI flags for the new dbt hook. We also refactored some of the tests.
1 parent 4fdb18e commit a881173

37 files changed

+1632
-1470
lines changed

airflow_dbt_python/operators/dbt.py

Lines changed: 199 additions & 314 deletions
Large diffs are not rendered by default.

tests/conftest.py

Lines changed: 173 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,10 @@
55
"""
66
import boto3
77
import pytest
8-
from dbt.version import __version__ as DBT_VERSION
98
from moto import mock_s3
10-
from packaging.version import parse
119
from pytest_postgresql.janitor import DatabaseJanitor
1210

13-
DBT_VERSION = parse(DBT_VERSION)
14-
IS_DBT_VERSION_LESS_THAN_0_20 = DBT_VERSION.minor < 20 and DBT_VERSION.major == 0
11+
from airflow_dbt_python.hooks.dbt import DbtHook
1512

1613
PROFILES = """
1714
default:
@@ -34,8 +31,7 @@
3431
version: 1.0.0
3532
"""
3633

37-
if not IS_DBT_VERSION_LESS_THAN_0_20:
38-
PROJECT += """
34+
PROJECT += """
3935
dispatch:
4036
- macro_namespace: dbt_utils
4137
search_order: [dbt_utils]
@@ -136,6 +132,7 @@
136132

137133
@pytest.fixture(scope="session")
138134
def database(postgresql_proc):
135+
"""Initialize a test postgres database."""
139136
janitor = DatabaseJanitor(
140137
postgresql_proc.user,
141138
postgresql_proc.host,
@@ -160,6 +157,7 @@ def database(postgresql_proc):
160157

161158
@pytest.fixture(scope="session")
162159
def profiles_file(tmp_path_factory, database):
160+
"""Create a profiles.yml file for testing."""
163161
p = tmp_path_factory.mktemp(".dbt") / "profiles.yml"
164162
profiles_content = PROFILES.format(
165163
host=database.host,
@@ -174,19 +172,22 @@ def profiles_file(tmp_path_factory, database):
174172

175173
@pytest.fixture(scope="session")
176174
def dbt_project_dir(tmp_path_factory):
175+
"""A temporary directory to store dbt test files."""
177176
d = tmp_path_factory.mktemp("project")
178177
return d
179178

180179

181180
@pytest.fixture(scope="session")
182181
def dbt_project_file(dbt_project_dir):
182+
"""Create a test dbt_project.yml file."""
183183
p = dbt_project_dir / "dbt_project.yml"
184184
p.write_text(PROJECT)
185185
return p
186186

187187

188188
@pytest.fixture(scope="session")
189189
def model_files(dbt_project_dir):
190+
"""Create test model files."""
190191
d = dbt_project_dir / "models"
191192
d.mkdir(exist_ok=True)
192193

@@ -200,13 +201,15 @@ def model_files(dbt_project_dir):
200201

201202
@pytest.fixture(scope="session")
202203
def sources_file(model_files, database):
204+
"""Create test source file."""
203205
m = model_files[0].parent / "my_sources.yml"
204206
m.write_text(SOURCES)
205207
return m
206208

207209

208210
@pytest.fixture(scope="session")
209211
def seed_files(dbt_project_dir):
212+
"""Create test seed files."""
210213
d = dbt_project_dir / "data"
211214
d.mkdir(exist_ok=True)
212215
s1 = d / "seed_1.csv"
@@ -218,18 +221,21 @@ def seed_files(dbt_project_dir):
218221

219222
@pytest.fixture(scope="session")
220223
def compile_dir(dbt_project_file):
224+
"""Return a path to the directory with compiled files."""
221225
d = dbt_project_file.parent
222226
return d / "target" / "compiled" / "test" / "models"
223227

224228

225229
@pytest.fixture
226230
def mocked_s3_res():
231+
"""Return a mocked s3 resource."""
227232
with mock_s3():
228233
yield boto3.resource("s3")
229234

230235

231236
@pytest.fixture
232237
def s3_bucket(mocked_s3_res):
238+
"""Return a mocked s3 bucket for testing."""
233239
bucket = "airflow-dbt-test-s3-bucket"
234240
mocked_s3_res.create_bucket(Bucket=bucket)
235241
return bucket
@@ -247,8 +253,169 @@ def s3_bucket(mocked_s3_res):
247253

248254
@pytest.fixture
249255
def broken_file(dbt_project_dir):
256+
"""Create a malformed SQL file for testing."""
250257
d = dbt_project_dir / "models"
251258
m = d / "broken.sql"
252259
m.write_text(BROKEN_SQL)
253260
yield m
254261
m.unlink()
262+
263+
264+
@pytest.fixture(scope="function")
265+
def dbt_modules_dir(dbt_project_file):
266+
"""Create a dbt_modules dir to install packages."""
267+
d = dbt_project_file.parent
268+
return d / "dbt_modules"
269+
270+
271+
PACKAGES = """
272+
packages:
273+
- package: dbt-labs/dbt_utils
274+
version: 0.7.3
275+
"""
276+
277+
278+
@pytest.fixture(scope="session")
279+
def packages_file(dbt_project_file):
280+
"""Create a test packages.yml file."""
281+
d = dbt_project_file.parent
282+
packages = d / "packages.yml"
283+
packages.write_text(PACKAGES)
284+
return packages
285+
286+
287+
@pytest.fixture
288+
def hook():
289+
"""Provide a DbtHook."""
290+
return DbtHook()
291+
292+
293+
@pytest.fixture
294+
def pre_compile(hook, dbt_project_file, profiles_file):
295+
"""Fixture to run a dbt compile task."""
296+
import shutil
297+
298+
factory = hook.get_config_factory("run")
299+
config = factory.create_config(
300+
project_dir=dbt_project_file.parent,
301+
profiles_dir=profiles_file.parent,
302+
)
303+
hook.run_dbt_task(config)
304+
yield
305+
target_dir = dbt_project_file.parent / "target"
306+
shutil.rmtree(target_dir, ignore_errors=True)
307+
308+
309+
SCHEMA_TESTS = """
310+
version: 2
311+
312+
models:
313+
- name: model_2
314+
columns:
315+
- name: field1
316+
tests:
317+
- unique
318+
- not_null
319+
- accepted_values:
320+
values: ['123', '456']
321+
- name: field2
322+
tests:
323+
- unique
324+
- not_null
325+
"""
326+
327+
328+
@pytest.fixture(scope="session")
329+
def schema_tests_files(dbt_project_dir):
330+
"""Create a dbt schema test YAML file."""
331+
d = dbt_project_dir / "models"
332+
d.mkdir(exist_ok=True)
333+
334+
schema = d / "schema.yml"
335+
schema.write_text(SCHEMA_TESTS)
336+
337+
return [schema]
338+
339+
340+
DATA_TEST_1 = """
341+
SELECT *
342+
FROM {{ ref('model_2' )}}
343+
WHERE field1 != 123
344+
"""
345+
346+
DATA_TEST_2 = """
347+
SELECT *
348+
FROM {{ ref('model_4' )}}
349+
WHERE field1 != 123
350+
"""
351+
352+
353+
@pytest.fixture(scope="session")
354+
def data_tests_files(dbt_project_dir):
355+
"""Create data test files."""
356+
d = dbt_project_dir / "test"
357+
d.mkdir(exist_ok=True)
358+
359+
test1 = d / "data_test_1.sql"
360+
test1.write_text(DATA_TEST_1)
361+
362+
test2 = d / "data_test_2.sql"
363+
test2.write_text(DATA_TEST_2)
364+
365+
return [test1, test2]
366+
367+
368+
SNAPSHOT_1 = """
369+
{% snapshot test_snapshot %}
370+
371+
{{
372+
config(
373+
target_database='test',
374+
target_schema='test',
375+
unique_key='id_field',
376+
377+
strategy='timestamp',
378+
updated_at='time_field',
379+
)
380+
}}
381+
382+
SELECT
383+
1 AS id_field,
384+
'abc' AS value_field,
385+
NOW() AS time_field
386+
387+
{% endsnapshot %}
388+
"""
389+
390+
391+
@pytest.fixture(scope="session")
392+
def snapshot_files(dbt_project_dir):
393+
"""Create dbt snapshot files."""
394+
d = dbt_project_dir / "snapshots"
395+
d.mkdir(exist_ok=True)
396+
397+
snap = d / "snapshot_1.sql"
398+
snap.write_text(SNAPSHOT_1)
399+
400+
return [snap]
401+
402+
403+
MACRO = """
404+
{% macro my_macro(an_arg) %}
405+
{% set sql %}
406+
SELECT {{ an_arg }} as the_arg;
407+
{% endset %}
408+
409+
{% do run_query(sql) %}
410+
{% endmacro %}
411+
"""
412+
413+
414+
@pytest.fixture
415+
def macro_file(dbt_project_dir):
416+
"""Create a dbt macro file."""
417+
d = dbt_project_dir / "macros"
418+
d.mkdir(exist_ok=True)
419+
m = d / "my_macro.sql"
420+
m.write_text(MACRO)
421+
return m

tests/hooks/dbt/test_dbt_build.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
"""Unit test module for running dbt build with the DbtHook."""
2+
from dbt.contracts.results import RunStatus, TestStatus
3+
4+
5+
def test_dbt_build_task(
6+
hook,
7+
profiles_file,
8+
dbt_project_file,
9+
model_files,
10+
seed_files,
11+
schema_tests_files,
12+
data_tests_files,
13+
):
14+
"""Test a dbt build task."""
15+
factory = hook.get_config_factory("build")
16+
config = factory.create_config(
17+
project_dir=dbt_project_file.parent,
18+
profiles_dir=profiles_file.parent,
19+
)
20+
success, results = hook.run_dbt_task(config)
21+
22+
assert success is True
23+
assert len(results.results) == 12
24+
25+
for result in results.results:
26+
assert result.status == RunStatus.Success or result.status == TestStatus.Pass
27+
28+
29+
def test_dbt_build_task_non_existent_model(
30+
hook,
31+
profiles_file,
32+
dbt_project_file,
33+
model_files,
34+
seed_files,
35+
schema_tests_files,
36+
data_tests_files,
37+
):
38+
"""Test a dbt build task with non existent model."""
39+
factory = hook.get_config_factory("build")
40+
config = factory.create_config(
41+
project_dir=dbt_project_file.parent,
42+
profiles_dir=profiles_file.parent,
43+
select=["missing"],
44+
)
45+
success, results = hook.run_dbt_task(config)
46+
47+
assert success is True
48+
assert len(results.results) == 0

tests/hooks/dbt/test_dbt_clean.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
"""Unit test module for running dbt clean with the DbtHook."""
2+
3+
4+
def test_dbt_clean_task(
5+
hook, profiles_file, dbt_project_file, model_files, pre_compile
6+
):
7+
"""Test a dbt clean task."""
8+
factory = hook.get_config_factory("clean")
9+
config = factory.create_config(
10+
project_dir=dbt_project_file.parent,
11+
profiles_dir=profiles_file.parent,
12+
)
13+
14+
compile_dir = dbt_project_file.parent / "target"
15+
assert compile_dir.exists() is True
16+
17+
success, results = hook.run_dbt_task(config)
18+
assert success is True
19+
assert compile_dir.exists() is False
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
"""Unit test module for running dbt compile with the DbtHook."""
2+
3+
4+
def test_dbt_compile_non_existent_model(
5+
hook, profiles_file, dbt_project_file, model_files
6+
):
7+
"""Test a dbt compile task with a non existent model."""
8+
factory = hook.get_config_factory("compile")
9+
config = factory.create_config(
10+
project_dir=dbt_project_file.parent,
11+
profiles_dir=profiles_file.parent,
12+
select=["fake"],
13+
full_refresh=True,
14+
)
15+
success, results = hook.run_dbt_task(config)
16+
assert success is True
17+
assert len(results.results) == 0
18+
19+
20+
def test_dbt_compile_task(hook, profiles_file, dbt_project_file, model_files):
21+
"""Test a dbt compile task."""
22+
import shutil
23+
24+
compile_dir = dbt_project_file.parent / "target"
25+
shutil.rmtree(compile_dir)
26+
assert compile_dir.exists() is False
27+
28+
factory = hook.get_config_factory("compile")
29+
config = factory.create_config(
30+
project_dir=dbt_project_file.parent,
31+
profiles_dir=profiles_file.parent,
32+
)
33+
34+
success, results = hook.run_dbt_task(config)
35+
assert success is True
36+
assert compile_dir.exists() is True

0 commit comments

Comments
 (0)