Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## PR Checklist
- [ ] Merged latest master
- [ ] Updated version number in `pyproject.toml`.
- [ ] Added tests for new features or bug fixes.
- [ ] Passed all tests
- [ ] Update README.md if needed.

## Breaking Changes
1 change: 0 additions & 1 deletion .github/tasks.md
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
## Tasks
Comment thread
TrevorBurgoyne marked this conversation as resolved.
- []
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,25 @@ S3MPConfig.set_mirror_root("s3_mirror")
S3MPConfig.assume_role("arn:aws:iam::<account-id>:role/<role-name>")
```

To manage projects that require managing different S3 buckets or different IAM roles, you can create different `MirrorPath` objects with different `bucket_key` and `iam_role_arn` parameters. These will use the appropriate sessions and clients under the hood, so you can easily interact with multiple buckets and roles within the same project if necessary. When no bucket or role is specified, the defaults from `S3MPConfig` are used — if `S3MPConfig.assume_role(...)` was called, that role becomes the default session; otherwise the ambient AWS credentials are used.

```python
from S3MP.mirror_path import MirrorPath

# MirrorPath using the default bucket and default session from S3MPConfig
default_mp = MirrorPath.from_s3_key("path/to/object.jpg")
# MirrorPath using specific bucket and IAM role
custom_mp = MirrorPath.from_s3_key(
"path/to/object.jpg",
bucket_key="custom-bucket",
iam_role_arn="arn:aws:iam::<account-id>:role/<role-name>"
)
# MirrorPath using bucket from s3 url and the default session
# The bucket key will be parsed from the url
url_mp = MirrorPath.from_s3_key("s3://custom-bucket/path/to/object.jpg")
```


## Installation
[uv](https://docs.astral.sh/uv/) is a fast, cross-platform Python package installer and resolver.

Expand Down
3 changes: 2 additions & 1 deletion S3MP/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""S3 MirrorPath package."""

from S3MP._version import __version__
from S3MP.global_config import S3Session

__all__ = ["__version__"]
__all__ = ["__version__", "S3Session"]
16 changes: 14 additions & 2 deletions S3MP/async_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,20 @@
async def async_upload_from_mirror(mirror_path: MirrorPath):
"""Asynchronously upload a file from a MirrorPath."""
session = aioboto3.Session()
if mirror_path.iam_role_arn:
async with session.client("sts") as sts_client:
assumed = await sts_client.assume_role(
RoleArn=mirror_path.iam_role_arn,
RoleSessionName="S3MPAsyncUploadSession",
)
creds = assumed["Credentials"]
session = aioboto3.Session(
aws_access_key_id=creds["AccessKeyId"],
aws_secret_access_key=creds["SecretAccessKey"],
aws_session_token=creds["SessionToken"],
)
async with session.resource("s3") as s3_resource:
bucket = await s3_resource.Bucket(S3MPConfig.default_bucket_key)
bucket = s3_resource.Bucket(mirror_path.bucket_key)
await bucket.upload_file(str(mirror_path.local_path), mirror_path.s3_key)


Expand All @@ -22,7 +34,7 @@ def upload_from_mirror_thread(
) -> Coroutine:
"""Upload from mirror on a separate thread."""
return asyncio.to_thread(
S3MPConfig.bucket.upload_file,
mirror_path.bucket.upload_file,
str(mirror_path.local_path),
mirror_path.s3_key,
Callback=S3MPConfig.callback,
Expand Down
21 changes: 13 additions & 8 deletions S3MP/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,27 @@ def __init__(
"""
if transfer_objs is None:
return
if not isinstance(transfer_objs, list):
transfer_objs = [transfer_objs]

# Fall back to global defaults for non-MirrorPath objects
if resource is None:
resource = S3MPConfig.s3_resource
if bucket_key is None:
bucket_key = S3MPConfig.default_bucket_key
if not isinstance(transfer_objs, list):
transfer_objs = [transfer_objs]

self._total_bytes = 0
for transfer_mapping in transfer_objs:
if is_download:
s3_key = str(
transfer_mapping.s3_key
if isinstance(transfer_mapping, MirrorPath)
else transfer_mapping
)
self._total_bytes += resource.Object(bucket_key, s3_key).content_length
if isinstance(transfer_mapping, MirrorPath):
mp_resource = transfer_mapping.session.s3_resource
mp_bucket_key = transfer_mapping.bucket_key
s3_key = transfer_mapping.s3_key
else:
mp_resource = resource
mp_bucket_key = bucket_key
s3_key = str(transfer_mapping)
self._total_bytes += mp_resource.Object(mp_bucket_key, s3_key).content_length
else:
local_path = (
transfer_mapping.local_path
Expand Down
146 changes: 90 additions & 56 deletions S3MP/global_config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Set global values for S3MP module."""

from __future__ import annotations

import tempfile
from collections.abc import Callable
from configparser import ConfigParser
Expand All @@ -13,6 +15,58 @@
from S3MP.types import S3Bucket, S3Client, S3Resource, S3TransferConfig


@dataclass
class S3Session:
"""Holds cached boto3 objects for a single IAM credential context."""

s3_client: S3Client
s3_resource: S3Resource
_bucket_map: dict[str, S3Bucket] | None = None

@staticmethod
def from_role_arn(role_arn: str, boto3_config: Config | None = None) -> S3Session:
"""Create a session by assuming an IAM role."""
sts_client = boto3.client("sts")
assumed_role = sts_client.assume_role(
RoleArn=role_arn, RoleSessionName="S3MPAssumeRoleSession"
)
credentials = assumed_role["Credentials"]
cfg = boto3_config or Config()
return S3Session(
s3_client=boto3.client(
"s3",
aws_access_key_id=credentials["AccessKeyId"],
aws_secret_access_key=credentials["SecretAccessKey"],
aws_session_token=credentials["SessionToken"],
config=cfg,
),
s3_resource=boto3.resource(
"s3",
aws_access_key_id=credentials["AccessKeyId"],
aws_secret_access_key=credentials["SecretAccessKey"],
aws_session_token=credentials["SessionToken"],
config=cfg,
),
)
Comment thread
TrevorBurgoyne marked this conversation as resolved.

@staticmethod
def no_role(boto3_config: Config | None = None) -> S3Session:
"""Create a session using no IAM role."""
cfg = boto3_config or Config()
return S3Session(
s3_client=boto3.client("s3", config=cfg),
s3_resource=boto3.resource("s3", config=cfg),
)

def get_bucket(self, bucket_key: str) -> S3Bucket:
"""Get boto3 S3Bucket object."""
if self._bucket_map is None:
self._bucket_map = {}
if bucket_key not in self._bucket_map:
self._bucket_map[bucket_key] = self.s3_resource.Bucket(bucket_key)
return self._bucket_map[bucket_key]


def get_config_file_path() -> Path:
"""Get the location of the config file."""
root_module_folder = Path(__file__).parent.resolve()
Expand All @@ -33,10 +87,8 @@ def __call__(cls, *args, **kwargs):
class _S3MPConfigClass(metaclass=Singleton):
"""Singleton class for S3MP globals."""

# Boto3 Objects
_s3_client: S3Client | None = None
_s3_resource: S3Resource | None = None
_bucket: S3Bucket | None = None
# Session registry: maps role ARN -> S3Session (None key = default session)
_session_map: dict[str | None, S3Session] | None = None
_boto3_config: Config | None = None

# Config Items
Expand All @@ -50,32 +102,28 @@ class _S3MPConfigClass(metaclass=Singleton):
callback: Callable | None = None
use_async_global_thread_queue: bool = True

def assume_role(self, role_arn: str) -> None:
"""Assume an IAM role and update the S3 client and resource with the new credentials."""
sts_client = boto3.client("sts")
assumed_role = sts_client.assume_role(
RoleArn=role_arn, RoleSessionName="S3MPAssumeRoleSession"
)
credentials = assumed_role["Credentials"]
def get_session(self, role_arn: str | None = None) -> S3Session:
"""Get or create a cached S3Session for the given role ARN.

self._s3_client = boto3.client(
"s3",
aws_access_key_id=credentials["AccessKeyId"],
aws_secret_access_key=credentials["SecretAccessKey"],
aws_session_token=credentials["SessionToken"],
config=self.boto3_config,
)
self._s3_resource = boto3.resource(
"s3",
aws_access_key_id=credentials["AccessKeyId"],
aws_secret_access_key=credentials["SecretAccessKey"],
aws_session_token=credentials["SessionToken"],
config=self.boto3_config,
)
self._iam_role_arn = role_arn
Args:
role_arn: IAM role ARN. None uses the no-role session.
"""
if self._session_map is None:
self._session_map = {}

if role_arn not in self._session_map:
if role_arn is not None:
self._session_map[role_arn] = S3Session.from_role_arn(role_arn, self.boto3_config)
else:
Comment thread
TrevorBurgoyne marked this conversation as resolved.
self._session_map[None] = S3Session.no_role(self.boto3_config)

# Clear cached bucket
self._bucket = None
return self._session_map[role_arn]

def assume_role(self, role_arn: str) -> None:
"""Set the default IAM role for the global config."""
self._iam_role_arn = role_arn
# Pre-cache the session for this role
self.get_session(role_arn)

@property
def default_bucket_key(self) -> str:
Expand All @@ -89,14 +137,10 @@ def default_bucket_key(self) -> str:
def set_default_bucket_key(self, bucket_key: str) -> None:
"""Set default bucket key."""
self._default_bucket_key = bucket_key
# Clear cached bucket
self._bucket = None

def clear_boto3_cache(self) -> None:
"""Clear cached boto3 client and resource."""
self._s3_client = None
self._s3_resource = None
self._bucket = None
"""Clear cached boto3 sessions, config, and buckets."""
self._session_map = {}
self._boto3_config = None

@property
Expand All @@ -123,34 +167,24 @@ def boto3_config(self) -> Config:
return self._boto3_config

@property
def s3_client(self) -> S3Client:
"""Get S3 client."""
if not self._s3_client and self._iam_role_arn:
self.assume_role(self._iam_role_arn)

if not self._s3_client:
self._s3_client = boto3.client("s3", config=self.boto3_config)
def default_session(self) -> S3Session:
"""Get the default session (uses default role if set, otherwise default credentials)."""
return self.get_session(self._iam_role_arn)

return self._s3_client
@property
def s3_client(self) -> S3Client:
"""Get S3 client from the default session."""
return self.default_session.s3_client

@property
def s3_resource(self) -> S3Resource:
"""Get S3 resource."""
if not self._s3_resource and self._iam_role_arn:
self.assume_role(self._iam_role_arn)

if not self._s3_resource:
self._s3_resource = boto3.resource("s3", config=self.boto3_config)

return self._s3_resource
"""Get S3 resource from the default session."""
return self.default_session.s3_resource

def get_bucket(self, bucket_key: str | None = None) -> S3Bucket:
"""Get bucket."""
if bucket_key:
return self.s3_resource.Bucket(bucket_key)
elif self._bucket is None:
self._bucket = self.s3_resource.Bucket(self.default_bucket_key)
return self._bucket
"""Get boto3 S3Bucket object from the default session."""
bucket_key = bucket_key or self.default_bucket_key
return self.default_session.get_bucket(bucket_key)

@property
def bucket(self) -> S3Bucket:
Expand Down
Loading
Loading