Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions archipy/adapters/minio/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,3 +756,58 @@ def get_bucket_policy(self, bucket_name: str) -> MinioPolicyType:
# Convert policy to MinioPolicyType format
policy_dict: MinioPolicyType = {"policy": policy}
return policy_dict

@override
def copy_object(
self,
src_bucket_name: str,
src_object_name: str,
dest_bucket_name: str,
dest_object_name: str,
) -> None:
"""Copy an object within or between buckets.

Args:
src_bucket_name: Source bucket name.
src_object_name: Source object name.
dest_bucket_name: Destination bucket name.
dest_object_name: Destination object name.

Raises:
InvalidArgumentError: If any required parameter is empty.
NotFoundError: If the source bucket or object does not exist.
PermissionDeniedError: If permission to copy is denied.
ServiceUnavailableError: If the S3 service is unavailable.
StorageError: If there's a storage-related error.
"""
try:
if not src_bucket_name or not src_object_name or not dest_bucket_name or not dest_object_name:
raise InvalidArgumentError(
argument_name=(
"src_bucket_name, src_object_name, dest_bucket_name or dest_object_name"
if not all([src_bucket_name, src_object_name, dest_bucket_name, dest_object_name])
else "src_bucket_name"
if not src_bucket_name
else "src_object_name"
if not src_object_name
else "dest_bucket_name"
if not dest_bucket_name
else "dest_object_name"
),
)
self._client.copy_object(
Bucket=dest_bucket_name,
CopySource=f"{src_bucket_name}/{src_object_name}",
Key=dest_object_name,
)
if hasattr(self.list_objects, "clear_cache"):
self.list_objects.clear_cache()
except InvalidArgumentError:
# Pass through our custom errors
raise
except ClientError as e:
self._handle_client_exception(e, "copy_object")
except (ConnectionError, EndpointConnectionError) as e:
self._handle_connection_exception(e, "copy_object")
except Exception as e:
self._handle_general_exception(e, "copy_object")
11 changes: 11 additions & 0 deletions archipy/adapters/minio/ports.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,14 @@ def set_bucket_policy(self, bucket_name: str, policy: str) -> None:
def get_bucket_policy(self, bucket_name: str) -> MinioPolicyType:
"""Get bucket policy."""
raise NotImplementedError

@abstractmethod
def copy_object(
self,
src_bucket_name: str,
src_object_name: str,
dest_bucket_name: str,
dest_object_name: str,
) -> None:
"""Copy an object within or between buckets."""
raise NotImplementedError
7 changes: 7 additions & 0 deletions features/minio_adapter.feature
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,10 @@ Feature: MinIO Operations Testing
And I delete bucket "test-bucket"
Then the object "test.txt" should not exist in bucket "test-bucket"
And the bucket "test-bucket" should not exist

Scenario: Copy object within same bucket
Given a bucket named "test-bucket" exists
And an object "test.txt" exists with content "Hello World" in bucket "test-bucket"
When I copy object "test.txt" from bucket "test-bucket" to "test-copy.txt" in the same bucket
Then the object "test-copy.txt" should exist in bucket "test-bucket"
And downloading "test-copy.txt" from "test-bucket" should return content "Hello World"
12 changes: 12 additions & 0 deletions features/steps/minio_adapter_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,18 @@ def step_delete_bucket(context, bucket_name):
context.logger.info(f"Deleted bucket '{bucket_name}'")


@when('I copy object "{src_object}" from bucket "{src_bucket}" to "{dest_object}" in the same bucket')
def step_copy_object_same_bucket(context, src_object, src_bucket, dest_object):
adapter = get_minio_adapter(context)
adapter.copy_object(
src_bucket_name=src_bucket,
src_object_name=src_object,
dest_bucket_name=src_bucket,
dest_object_name=dest_object,
)
context.logger.info(f"Copied '{src_object}' to '{dest_object}' in '{src_bucket}'")


# Then steps
@then('the bucket "{bucket_name}" should exist')
def step_bucket_should_exist(context, bucket_name):
Expand Down
Loading