Skip to content

Commit e9f48ff

Browse files
committed
fix(s3): Skip empty files when downloading dbt projects from S3
Me and other users have noticed that when uploading files to S3 using the AWS web UI an unnamed file may sometimes appear. Not entirely sure if this is a bug on AWS' side or what, but it's causing us issues when downloading dbt projects from S3 as we attemp to create a file with the name of the parent directory (which usually already exists, or is created and later downloads fail when they are nested inside that file/directory). So this deals with the issue by ignoring files like this. Also added a unit test to cover this case, although it's kind of hard to test the behaviour as this file apparently only appears when using the web UI.
1 parent 33c4803 commit e9f48ff

File tree

2 files changed

+69
-7
lines changed

2 files changed

+69
-7
lines changed

airflow_dbt_python/hooks/s3.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,16 @@ def download_one_s3_object(self, target: Path, s3_object):
5252
"""Download a single s3 object."""
5353
self.log.info("Saving %s file to: %s", s3_object, target)
5454

55-
with open(target, "wb+") as f:
56-
s3_object.download_fileobj(f)
55+
try:
56+
with open(target, "wb+") as f:
57+
s3_object.download_fileobj(f)
58+
59+
except IsADirectoryError:
60+
# Uploading files manually via the AWS UI to S3 can cause files
61+
# with empty names to appear. When we attemp to download it, we build
62+
# a relative path that is equal to the parent directory that already
63+
# exists.
64+
self.log.warning("A file with no name was found in S3 at %s", s3_object)
5765

5866
def get_dbt_project(
5967
self, s3_project_url: str, project_dir: Optional[str] = None
@@ -112,6 +120,13 @@ def download_many_s3_keys(
112120
for s3_object_key in s3_keys:
113121
s3_object = self.get_key(key=s3_object_key, bucket_name=bucket_name)
114122
path_file = Path(s3_object_key).relative_to(prefix)
123+
124+
if path_file.suffix == "" and s3_object.key.endswith("/"):
125+
# Empty S3 files may also be confused with unwanted directories.
126+
# See comment in line 60.
127+
self.log.warning("A file with no name was found in S3 at %s", s3_object)
128+
continue
129+
115130
local_project_file = target_dir / path_file
116131
local_project_file.parent.mkdir(parents=True, exist_ok=True)
117132

tests/hooks/s3/test_dbt_s3_hook.py

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -168,18 +168,27 @@ def test_get_dbt_project_no_trailing_slash(s3_bucket, tmpdir, dbt_project_file):
168168

169169

170170
@pytest.fixture
171-
def test_files():
172-
f1 = Path("seeds/a_seed.csv")
171+
def test_files(tmp_path_factory):
172+
"""Create test files to upload to S3."""
173+
d = tmp_path_factory.mktemp("test_s3")
174+
seed_dir = d / "seeds"
175+
seed_dir.mkdir(exist_ok=True)
176+
f1 = seed_dir / "a_seed.csv"
177+
173178
with open(f1, "w+") as f:
174179
f.write("col1,col2\n1,2")
175180

176-
f2 = Path("models/a_model.sql")
181+
models_dir = d / "models"
182+
models_dir.mkdir(exist_ok=True)
183+
f2 = models_dir / "a_model.sql"
177184
with open(f2, "w+") as f:
178185
f.write("SELECT 1")
179-
f3 = Path("models/another_model.sql")
186+
f3 = models_dir / "another_model.sql"
180187
with open(f3, "w+") as f:
181188
f.write("SELECT 2")
189+
182190
yield [f1, f2, f3]
191+
183192
f1.unlink()
184193
f2.unlink()
185194
f3.unlink()
@@ -190,11 +199,14 @@ def test_get_dbt_project_from_zip_file(s3_bucket, tmpdir, dbt_project_file, test
190199
with open(dbt_project_file) as pf:
191200
project_content = pf.read()
192201

202+
# Prepare a zip file to upload to S3
193203
zip_buffer = io.BytesIO()
194204
with ZipFile(zip_buffer, "a") as zf:
195205
zf.write(dbt_project_file, "dbt_project.yml")
196206
for f in test_files:
197-
zf.write(f)
207+
# Since files are in a different temporary directory, we need to zip them
208+
# with their direct parent, e.g. models/a_model.sql
209+
zf.write(f, arcname="/".join([f.parts[-2], f.parts[-1]]))
198210

199211
hook = DbtS3Hook()
200212
bucket = hook.get_bucket(s3_bucket)
@@ -230,3 +242,38 @@ def test_get_dbt_project_from_zip_file(s3_bucket, tmpdir, dbt_project_file, test
230242
with open(project_path / "seeds" / "a_seed.csv") as f:
231243
result = f.read()
232244
assert result == "col1,col2\n1,2"
245+
246+
247+
def test_get_dbt_project_with_empty_file(s3_bucket, tmpdir, dbt_project_file):
248+
"""Test whether an S3 path without a trailing slash pulls a dbt project."""
249+
hook = DbtS3Hook()
250+
bucket = hook.get_bucket(s3_bucket)
251+
252+
with open(dbt_project_file) as pf:
253+
project_content = pf.read()
254+
bucket.put_object(Key="project/dbt_project.yml", Body=project_content.encode())
255+
bucket.put_object(Key="project/models/a_model.sql", Body=b"SELECT 1")
256+
bucket.put_object(Key="project/data/a_seed.csv", Body=b"col1,col2\n1,2")
257+
bucket.put_object(Key="project/data//", Body=b"")
258+
259+
project_path = hook.get_dbt_project(
260+
f"s3://{s3_bucket}/project",
261+
project_dir=str(tmpdir),
262+
)
263+
264+
assert project_path.exists()
265+
266+
dir_contents = [f for f in project_path.iterdir()]
267+
assert sorted(str(f.name) for f in dir_contents) == [
268+
"data",
269+
"dbt_project.yml",
270+
"models",
271+
]
272+
273+
with open(project_path / "dbt_project.yml") as f:
274+
result = f.read()
275+
assert result == project_content
276+
277+
with open(project_path / "models" / "a_model.sql") as f:
278+
result = f.read()
279+
assert result == "SELECT 1"

0 commit comments

Comments
 (0)