Skip to content

Commit fd5825a

Browse files
committed
Add compatibility with minio-py==7.2.19
See minio/minio-py#1536
1 parent 25fd53f commit fd5825a

File tree

1 file changed

+61
-17
lines changed
  • onetl/connection/file_connection

1 file changed

+61
-17
lines changed

onetl/connection/file_connection/s3.py

Lines changed: 61 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,11 @@ def remove_dir(self, path: os.PathLike | str, recursive: bool = False) -> bool:
169169
def _scan_entries_recursive(root: RemotePath) -> Iterable[Object]: # noqa: WPS430
170170
nonlocal is_empty
171171
directory_path_str = self._delete_absolute_path_slash(root) + "/"
172-
entries = self.client.list_objects(self.bucket, prefix=directory_path_str, recursive=True)
172+
entries = self.client.list_objects(
173+
bucket_name=self.bucket,
174+
prefix=directory_path_str,
175+
recursive=True,
176+
)
173177
for entry in entries:
174178
is_empty = False # noqa: WPS442
175179

@@ -207,7 +211,12 @@ def _scan_entries_recursive(root: RemotePath) -> Iterable[Object]: # noqa: WPS4
207211
objects_to_delete = (
208212
DeleteObject(obj.object_name) for obj in _scan_entries_recursive(remote_dir) # type: ignore[arg-type]
209213
)
210-
errors = list(self.client.remove_objects(self.bucket, objects_to_delete))
214+
errors = list(
215+
self.client.remove_objects(
216+
bucket_name=self.bucket,
217+
objects=objects_to_delete,
218+
),
219+
)
211220
if errors:
212221
if log.isEnabledFor(logging.DEBUG):
213222
log.debug(
@@ -228,7 +237,10 @@ def path_exists(self, path: os.PathLike | str) -> bool:
228237
return True
229238

230239
remote_path_str = self._delete_absolute_path_slash(remote_path)
231-
for component in self.client.list_objects(self.bucket, prefix=remote_path_str):
240+
for component in self.client.list_objects( # noqa: WPS352
241+
bucket_name=self.bucket,
242+
prefix=remote_path_str,
243+
):
232244
component_path = RemotePath(component.object_name)
233245
component_path_str = self._delete_absolute_path_slash(component_path)
234246
if component_path_str == remote_path_str:
@@ -280,15 +292,22 @@ def _delete_absolute_path_slash(cls, path: RemotePath) -> str:
280292

281293
def _download_file(self, remote_file_path: RemotePath, local_file_path: LocalPath) -> None:
282294
path_str = self._delete_absolute_path_slash(remote_file_path)
283-
self.client.fget_object(self.bucket, path_str, os.fspath(local_file_path))
295+
self.client.fget_object(
296+
bucket_name=self.bucket,
297+
object_name=path_str,
298+
file_path=os.fspath(local_file_path),
299+
)
284300

285301
def _get_stat(self, path: RemotePath) -> RemotePathStat:
286302
if self._is_root(path):
287303
return RemotePathStat()
288304

289305
# for some reason, client.stat_object returns less precise st_mtime than client.list_objects
290306
path_str = self._delete_absolute_path_slash(path)
291-
objects = self.client.list_objects(self.bucket, prefix=path_str)
307+
objects = self.client.list_objects(
308+
bucket_name=self.bucket,
309+
prefix=path_str,
310+
)
292311
for obj in objects:
293312
object_path = RemotePath(obj.object_name)
294313
object_path_str = self._delete_absolute_path_slash(object_path)
@@ -300,15 +319,22 @@ def _get_stat(self, path: RemotePath) -> RemotePathStat:
300319

301320
def _remove_file(self, remote_file_path: RemotePath) -> None:
302321
path_str = self._delete_absolute_path_slash(remote_file_path)
303-
self.client.remove_object(self.bucket, path_str)
322+
self.client.remove_object(
323+
bucket_name=self.bucket,
324+
object_name=path_str,
325+
)
304326

305327
def _create_dir(self, path: RemotePath) -> None:
306328
# in s3 dirs do not exist
307329
pass
308330

309331
def _upload_file(self, local_file_path: LocalPath, remote_file_path: RemotePath) -> None:
310332
path_str = self._delete_absolute_path_slash(remote_file_path)
311-
self.client.fput_object(self.bucket, path_str, os.fspath(local_file_path))
333+
self.client.fput_object(
334+
bucket_name=self.bucket,
335+
object_name=path_str,
336+
file_path=os.fspath(local_file_path),
337+
)
312338

313339
def _rename_file(self, source: RemotePath, target: RemotePath) -> None:
314340
source_str = self._delete_absolute_path_slash(source)
@@ -323,10 +349,13 @@ def _rename_file(self, source: RemotePath, target: RemotePath) -> None:
323349

324350
def _scan_entries(self, path: RemotePath) -> Iterable[Object]:
325351
if self._is_root(path):
326-
return self.client.list_objects(self.bucket)
352+
return self.client.list_objects(bucket_name=self.bucket)
327353

328354
directory_path_str = self._delete_absolute_path_slash(path) + "/"
329-
objects = self.client.list_objects(self.bucket, prefix=directory_path_str)
355+
objects = self.client.list_objects(
356+
bucket_name=self.bucket,
357+
prefix=directory_path_str,
358+
)
330359
return (obj for obj in objects if obj.object_name != directory_path_str)
331360

332361
def _extract_name_from_entry(self, entry: Object) -> str:
@@ -351,7 +380,10 @@ def _remove_dir(self, path: RemotePath) -> None:
351380
# S3 does not have directories, but some integrations (like Iceberg or Spark 4.0)
352381
# may create empty object with name ending with /, and it will be considered as a directory marker.
353382
# delete such object if it present, and ignore error if it does not
354-
self.client.remove_object(self.bucket, directory_path_str)
383+
self.client.remove_object(
384+
bucket_name=self.bucket,
385+
object_name=directory_path_str,
386+
)
355387
except S3Error as err:
356388
if err.code == "NoSuchKey":
357389
return
@@ -360,22 +392,26 @@ def _remove_dir(self, path: RemotePath) -> None:
360392
def _read_text(self, path: RemotePath, encoding: str, **kwargs) -> str:
361393
path_str = self._delete_absolute_path_slash(path)
362394
file_handler = self.client.get_object(
363-
self.bucket,
364-
path_str,
395+
bucket_name=self.bucket,
396+
object_name=path_str,
365397
**kwargs,
366398
)
367399
return file_handler.read().decode(encoding)
368400

369401
def _read_bytes(self, path: RemotePath, **kwargs) -> bytes:
370402
path_str = self._delete_absolute_path_slash(path)
371-
file_handler = self.client.get_object(self.bucket, path_str, **kwargs)
403+
file_handler = self.client.get_object(
404+
bucket_name=self.bucket,
405+
object_name=path_str,
406+
**kwargs,
407+
)
372408
return file_handler.read()
373409

374410
def _write_text(self, path: RemotePath, content: str, encoding: str, **kwargs) -> None:
375411
content_bytes = content.encode(encoding)
376412
stream = io.BytesIO(content_bytes)
377413
self.client.put_object(
378-
self.bucket,
414+
bucket_name=self.bucket,
379415
data=stream,
380416
object_name=self._delete_absolute_path_slash(path),
381417
length=len(content_bytes),
@@ -385,7 +421,7 @@ def _write_text(self, path: RemotePath, content: str, encoding: str, **kwargs) -
385421
def _write_bytes(self, path: RemotePath, content: bytes, **kwargs) -> None:
386422
stream = io.BytesIO(content)
387423
self.client.put_object(
388-
self.bucket,
424+
bucket_name=self.bucket,
389425
data=stream,
390426
object_name=self._delete_absolute_path_slash(path),
391427
length=len(content),
@@ -398,15 +434,23 @@ def _is_dir(self, path: RemotePath) -> bool:
398434

399435
directory_path_str = self._delete_absolute_path_slash(path) + "/"
400436
try:
401-
next(self.client.list_objects(self.bucket, prefix=directory_path_str))
437+
next(
438+
self.client.list_objects(
439+
bucket_name=self.bucket,
440+
prefix=directory_path_str,
441+
),
442+
)
402443
return True
403444
except StopIteration:
404445
return False
405446

406447
def _is_file(self, path: RemotePath) -> bool:
407448
path_str = self._delete_absolute_path_slash(path)
408449
try:
409-
self.client.stat_object(self.bucket, path_str)
450+
self.client.stat_object(
451+
bucket_name=self.bucket,
452+
object_name=path_str,
453+
)
410454
return True
411455
except S3Error as err:
412456
if err.code == "NoSuchKey":

0 commit comments

Comments
 (0)