|
21 | 21 | import numpy as np |
22 | 22 | import pandas as pd |
23 | 23 | import requests |
| 24 | +import tempfile |
24 | 25 | from boto3.exceptions import Boto3Error |
25 | 26 | from lib.app.analytics.common import aggregate_annotations_as_df |
26 | 27 | from lib.app.analytics.common import consensus_plot |
@@ -3966,51 +3967,76 @@ def validate_project_type(self): |
3966 | 3967 | constances.LIMITED_FUNCTIONS[self._project.project_type] |
3967 | 3968 | ) |
3968 | 3969 |
|
3969 | | - def execute(self): |
3970 | | - if self.is_valid(): |
3971 | | - exports = self._service.get_exports( |
3972 | | - team_id=self._project.team_id, project_id=self._project.uuid |
3973 | | - ) |
3974 | | - export_id = None |
3975 | | - for export in exports: |
3976 | | - if export["name"] == self._export_name: |
3977 | | - export_id = export["id"] |
3978 | | - break |
3979 | | - if not export_id: |
3980 | | - raise AppException("Export not found.") |
| 3970 | + def upload_to_s3_from_folder(self, folder_path: str): |
| 3971 | + to_s3_bucket = boto3.Session().resource("s3").Bucket(self._to_s3_bucket) |
| 3972 | + files_to_upload = list(Path(folder_path).rglob("*.*")) |
3981 | 3973 |
|
3982 | | - while True: |
3983 | | - export = self._service.get_export( |
3984 | | - team_id=self._project.team_id, |
3985 | | - project_id=self._project.uuid, |
3986 | | - export_id=export_id, |
| 3974 | + def _upload_file_to_s3(_to_s3_bucket, _path, _s3_key) -> None: |
| 3975 | + _to_s3_bucket.upload_file(_path, _s3_key) |
| 3976 | + |
| 3977 | + with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: |
| 3978 | + results = [] |
| 3979 | + for path in files_to_upload: |
| 3980 | + s3_key = f"{self._folder_path}/{path.name}" |
| 3981 | + results.append( |
| 3982 | + executor.submit( |
| 3983 | + _upload_file_to_s3, to_s3_bucket, str(path), s3_key |
| 3984 | + ) |
3987 | 3985 | ) |
3988 | 3986 |
|
3989 | | - if export["status"] == ExportStatus.IN_PROGRESS.value: |
3990 | | - logger.info("Waiting 5 seconds for export to finish on server.") |
3991 | | - time.sleep(5) |
3992 | | - continue |
3993 | | - if export["status"] == ExportStatus.ERROR.value: |
3994 | | - raise AppException("Couldn't download export.") |
3995 | | - pass |
3996 | | - break |
3997 | | - |
3998 | | - filename = Path(export["path"]).name |
3999 | | - filepath = Path(self._folder_path) / filename |
4000 | | - with requests.get(export["download"], stream=True) as r: |
4001 | | - r.raise_for_status() |
4002 | | - with open(filepath, "wb") as f: |
4003 | | - for chunk in r.iter_content(chunk_size=8192): |
4004 | | - f.write(chunk) |
4005 | | - if self._extract_zip_contents: |
4006 | | - with zipfile.ZipFile(filepath, "r") as f: |
4007 | | - f.extractall(self._folder_path) |
4008 | | - Path.unlink(filepath) |
4009 | | - logger.info(f"Extracted {filepath} to folder {self._folder_path}") |
4010 | | - else: |
4011 | | - logger.info(f"Downloaded export ID {export['id']} to {filepath}") |
| 3987 | + def download_to_local_storage(self, destination: str): |
| 3988 | + exports = self._service.get_exports( |
| 3989 | + team_id=self._project.team_id, project_id=self._project.uuid |
| 3990 | + ) |
| 3991 | + export = next(filter(lambda i: i["name"] == self._export_name, exports), None) |
| 3992 | + export = self._service.get_export( |
| 3993 | + team_id=self._project.team_id, |
| 3994 | + project_id=self._project.uuid, |
| 3995 | + export_id=export["id"], |
| 3996 | + ) |
| 3997 | + if not export: |
| 3998 | + raise AppException("Export not found.") |
| 3999 | + export_status = export["status"] |
| 4000 | + |
| 4001 | + while export_status != ExportStatus.COMPLETE.value: |
| 4002 | + logger.info("Waiting 5 seconds for export to finish on server.") |
| 4003 | + time.sleep(5) |
| 4004 | + |
| 4005 | + export = self._service.get_export( |
| 4006 | + team_id=self._project.team_id, |
| 4007 | + project_id=self._project.uuid, |
| 4008 | + export_id=export["id"], |
| 4009 | + ) |
| 4010 | + export_status = export["status"] |
| 4011 | + if export_status in (ExportStatus.ERROR.value, ExportStatus.CANCELED.value): |
| 4012 | + raise AppException("Couldn't download export.") |
| 4013 | + |
| 4014 | + filename = Path(export["path"]).name |
| 4015 | + filepath = Path(destination) / filename |
| 4016 | + with requests.get(export["download"], stream=True) as response: |
| 4017 | + response.raise_for_status() |
| 4018 | + with open(filepath, "wb") as f: |
| 4019 | + for chunk in response.iter_content(chunk_size=8192): |
| 4020 | + f.write(chunk) |
| 4021 | + if self._extract_zip_contents: |
| 4022 | + with zipfile.ZipFile(filepath, "r") as f: |
| 4023 | + f.extractall(destination) |
| 4024 | + Path.unlink(filepath) |
| 4025 | + return export["id"], filepath, destination |
4012 | 4026 |
|
4013 | | - self._response.data = self._folder_path |
| 4027 | + def execute(self): |
| 4028 | + if self.is_valid(): |
| 4029 | + if self._to_s3_bucket: |
| 4030 | + with tempfile.TemporaryDirectory() as tmp: |
| 4031 | + self.download_to_local_storage(tmp) |
| 4032 | + self.upload_to_s3_from_folder(tmp) |
| 4033 | + logger.info(f"Exported to AWS {self._to_s3_bucket}/{self._folder_path}") |
| 4034 | + else: |
| 4035 | + export_id, filepath, destination = self.download_to_local_storage(self._folder_path) |
| 4036 | + if self._extract_zip_contents: |
| 4037 | + logger.info(f"Extracted {filepath} to folder {destination}") |
| 4038 | + else: |
| 4039 | + logger.info(f"Downloaded export ID {export_id} to {filepath}") |
4014 | 4040 | return self._response |
4015 | 4041 |
|
4016 | 4042 |
|
|
0 commit comments