|
5 | 5 | import logging |
6 | 6 | import os.path |
7 | 7 | import random |
| 8 | +import tempfile |
8 | 9 | import time |
9 | 10 | import uuid |
10 | 11 | import zipfile |
@@ -4012,58 +4013,92 @@ def __init__( |
4012 | 4013 | self._folder_path = folder_path |
4013 | 4014 | self._extract_zip_contents = extract_zip_contents |
4014 | 4015 | self._to_s3_bucket = to_s3_bucket |
| 4016 | + self._temp_dir = None |
4015 | 4017 |
|
4016 | 4018 | def validate_project_type(self): |
4017 | 4019 | if self._project.project_type in constances.LIMITED_FUNCTIONS: |
4018 | 4020 | raise AppValidationException( |
4019 | 4021 | constances.LIMITED_FUNCTIONS[self._project.project_type] |
4020 | 4022 | ) |
4021 | 4023 |
|
4022 | | - def execute(self): |
4023 | | - if self.is_valid(): |
4024 | | - exports = self._service.get_exports( |
4025 | | - team_id=self._project.team_id, project_id=self._project.uuid |
4026 | | - ) |
4027 | | - export_id = None |
4028 | | - for export in exports: |
4029 | | - if export["name"] == self._export_name: |
4030 | | - export_id = export["id"] |
4031 | | - break |
4032 | | - if not export_id: |
4033 | | - raise AppException("Export not found.") |
| 4024 | + def upload_to_s3_from_folder(self, folder_path: str): |
| 4025 | + to_s3_bucket = boto3.Session().resource("s3").Bucket(self._to_s3_bucket) |
| 4026 | + files_to_upload = list(Path(folder_path).rglob("*.*")) |
4034 | 4027 |
|
4035 | | - while True: |
4036 | | - export = self._service.get_export( |
4037 | | - team_id=self._project.team_id, |
4038 | | - project_id=self._project.uuid, |
4039 | | - export_id=export_id, |
| 4028 | + def _upload_file_to_s3(_to_s3_bucket, _path, _s3_key) -> None: |
| 4029 | + _to_s3_bucket.upload_file(_path, _s3_key) |
| 4030 | + |
| 4031 | + with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: |
| 4032 | + results = [] |
| 4033 | + for path in files_to_upload: |
| 4034 | + s3_key = f"{self._folder_path}/{path.name}" |
| 4035 | + results.append( |
| 4036 | + executor.submit(_upload_file_to_s3, to_s3_bucket, str(path), s3_key) |
4040 | 4037 | ) |
| 4038 | + yield |
4041 | 4039 |
|
4042 | | - if export["status"] == ExportStatus.IN_PROGRESS.value: |
4043 | | - logger.info("Waiting 5 seconds for export to finish on server.") |
4044 | | - time.sleep(5) |
4045 | | - continue |
4046 | | - if export["status"] == ExportStatus.ERROR.value: |
4047 | | - raise AppException("Couldn't download export.") |
4048 | | - pass |
4049 | | - break |
4050 | | - |
4051 | | - filename = Path(export["path"]).name |
4052 | | - filepath = Path(self._folder_path) / filename |
4053 | | - with requests.get(export["download"], stream=True) as r: |
4054 | | - r.raise_for_status() |
4055 | | - with open(filepath, "wb") as f: |
4056 | | - for chunk in r.iter_content(chunk_size=8192): |
4057 | | - f.write(chunk) |
4058 | | - if self._extract_zip_contents: |
4059 | | - with zipfile.ZipFile(filepath, "r") as f: |
4060 | | - f.extractall(self._folder_path) |
4061 | | - Path.unlink(filepath) |
4062 | | - logger.info(f"Extracted {filepath} to folder {self._folder_path}") |
4063 | | - else: |
4064 | | - logger.info(f"Downloaded export ID {export['id']} to {filepath}") |
| 4040 | + def download_to_local_storage(self, destination: str): |
| 4041 | + exports = self._service.get_exports( |
| 4042 | + team_id=self._project.team_id, project_id=self._project.uuid |
| 4043 | + ) |
| 4044 | + export = next(filter(lambda i: i["name"] == self._export_name, exports), None) |
| 4045 | + export = self._service.get_export( |
| 4046 | + team_id=self._project.team_id, |
| 4047 | + project_id=self._project.uuid, |
| 4048 | + export_id=export["id"], |
| 4049 | + ) |
| 4050 | + if not export: |
| 4051 | + raise AppException("Export not found.") |
| 4052 | + export_status = export["status"] |
| 4053 | + |
| 4054 | + while export_status != ExportStatus.COMPLETE.value: |
| 4055 | + logger.info("Waiting 5 seconds for export to finish on server.") |
| 4056 | + time.sleep(5) |
| 4057 | + |
| 4058 | + export = self._service.get_export( |
| 4059 | + team_id=self._project.team_id, |
| 4060 | + project_id=self._project.uuid, |
| 4061 | + export_id=export["id"], |
| 4062 | + ) |
| 4063 | + export_status = export["status"] |
| 4064 | + if export_status in (ExportStatus.ERROR.value, ExportStatus.CANCELED.value): |
| 4065 | + raise AppException("Couldn't download export.") |
| 4066 | + |
| 4067 | + filename = Path(export["path"]).name |
| 4068 | + filepath = Path(destination) / filename |
| 4069 | + with requests.get(export["download"], stream=True) as response: |
| 4070 | + response.raise_for_status() |
| 4071 | + with open(filepath, "wb") as f: |
| 4072 | + for chunk in response.iter_content(chunk_size=8192): |
| 4073 | + f.write(chunk) |
| 4074 | + if self._extract_zip_contents: |
| 4075 | + with zipfile.ZipFile(filepath, "r") as f: |
| 4076 | + f.extractall(destination) |
| 4077 | + Path.unlink(filepath) |
| 4078 | + return export["id"], filepath, destination |
| 4079 | + |
| 4080 | + def get_upload_files_count(self): |
| 4081 | + if not self._temp_dir: |
| 4082 | + self._temp_dir = tempfile.TemporaryDirectory() |
| 4083 | + self.download_to_local_storage(self._temp_dir.name) |
| 4084 | + return len(list(Path(self._temp_dir.name).rglob("*.*"))) |
4065 | 4085 |
|
4066 | | - self._response.data = self._folder_path |
| 4086 | + def execute(self): |
| 4087 | + if self.is_valid(): |
| 4088 | + if self._to_s3_bucket: |
| 4089 | + self.get_upload_files_count() |
| 4090 | + yield from self.upload_to_s3_from_folder(self._temp_dir.name) |
| 4091 | + logger.info(f"Exported to AWS {self._to_s3_bucket}/{self._folder_path}") |
| 4092 | + self._temp_dir.cleanup() |
| 4093 | + else: |
| 4094 | + export_id, filepath, destination = self.download_to_local_storage( |
| 4095 | + self._folder_path |
| 4096 | + ) |
| 4097 | + if self._extract_zip_contents: |
| 4098 | + logger.info(f"Extracted {filepath} to folder {destination}") |
| 4099 | + else: |
| 4100 | + logger.info(f"Downloaded export ID {export_id} to {filepath}") |
| 4101 | + yield |
4067 | 4102 | return self._response |
4068 | 4103 |
|
4069 | 4104 |
|
|
0 commit comments