Skip to content

Commit 3b8d306

Browse files
authored
Merge pull request #491 from superannotateai/1297_download_large_files
1297 download large files
2 parents 7c26ca6 + 7a44e38 commit 3b8d306

File tree

2 files changed

+232
-65
lines changed

2 files changed

+232
-65
lines changed

src/superannotate/lib/core/usecases/annotations.py

Lines changed: 133 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -992,6 +992,8 @@ def __init__(
992992
self._classes = classes
993993
self._callback = callback
994994
self._images = images
995+
self._big_file_queues = []
996+
self._small_file_queues = []
995997

996998
def validate_item_names(self):
997999
if self._item_names:
@@ -1052,7 +1054,105 @@ def coroutine_wrapper(coroutine):
10521054
loop.close()
10531055
return count
10541056

1057+
async def _download_big_annotation(self, item, export_path, folder_id):
1058+
postfix = self.get_postfix()
1059+
await self._backend_client.download_big_annotation(
1060+
item=item,
1061+
team_id=self._project.team_id,
1062+
project_id=self._project.id,
1063+
folder_id=folder_id,
1064+
reporter=self.reporter,
1065+
download_path=f"{export_path}{'/' + self._folder.name if not self._folder.is_root else ''}",
1066+
postfix=postfix,
1067+
callback=self._callback,
1068+
)
1069+
1070+
async def download_big_annotations(self, queue_idx, export_path, folder_id):
1071+
while True:
1072+
cur_queue = self._big_file_queues[queue_idx]
1073+
item = await cur_queue.get()
1074+
cur_queue.task_done()
1075+
if item:
1076+
await self._download_big_annotation(item, export_path, folder_id)
1077+
else:
1078+
cur_queue.put_nowait(None)
1079+
break
1080+
1081+
async def download_small_annotations(self, queue_idx, export_path, folder_id):
1082+
cur_queue = self._small_file_queues[queue_idx]
1083+
1084+
items = []
1085+
item = ""
1086+
postfix = self.get_postfix()
1087+
while item is not None:
1088+
item = await cur_queue.get()
1089+
if item:
1090+
items.append(item)
1091+
await self._backend_client.download_small_annotations(
1092+
team_id=self._project.team_id,
1093+
project_id=self._project.id,
1094+
folder_id=folder_id,
1095+
items=items,
1096+
reporter=self.reporter,
1097+
download_path=f"{export_path}{'/' + self._folder.name if not self._folder.is_root else ''}",
1098+
postfix=postfix,
1099+
callback=self._callback,
1100+
)
1101+
1102+
async def distribute_to_queues(
1103+
self, item_names, sm_queue_id, l_queue_id, folder_id
1104+
):
1105+
try:
1106+
team_id = self._project.team_id
1107+
project_id = self._project.id
1108+
1109+
resp = self._backend_client.sort_items_by_size(
1110+
item_names, team_id, project_id, folder_id
1111+
)
1112+
1113+
for item in resp["large"]:
1114+
await self._big_file_queues[l_queue_id].put(item)
1115+
1116+
for item in resp["small"]:
1117+
await self._small_file_queues[sm_queue_id].put(item["name"])
1118+
finally:
1119+
await self._big_file_queues[l_queue_id].put(None)
1120+
await self._small_file_queues[sm_queue_id].put(None)
1121+
1122+
async def run_workers(self, item_names, folder_id, export_path):
1123+
try:
1124+
self._big_file_queues.append(asyncio.Queue())
1125+
self._small_file_queues.append(asyncio.Queue())
1126+
small_file_queue_idx = len(self._small_file_queues) - 1
1127+
big_file_queue_idx = len(self._big_file_queues) - 1
1128+
res = await asyncio.gather(
1129+
self.distribute_to_queues(
1130+
item_names, small_file_queue_idx, big_file_queue_idx, folder_id
1131+
),
1132+
self.download_big_annotations(
1133+
big_file_queue_idx, export_path, folder_id
1134+
),
1135+
self.download_big_annotations(
1136+
big_file_queue_idx, export_path, folder_id
1137+
),
1138+
self.download_big_annotations(
1139+
big_file_queue_idx, export_path, folder_id
1140+
),
1141+
self.download_small_annotations(
1142+
small_file_queue_idx, export_path, folder_id
1143+
),
1144+
return_exceptions=True,
1145+
)
1146+
if any(res):
1147+
self.reporter.log_error(f"Error {str([i for i in res if i])}")
1148+
except Exception as e:
1149+
self.reporter.log_error(f"Error {str(e)}")
1150+
1151+
def per_folder_execute(self, item_names, folder_id, export_path):
1152+
asyncio.run(self.run_workers(item_names, folder_id, export_path))
1153+
10551154
def execute(self):
1155+
10561156
if self.is_valid():
10571157
export_path = str(
10581158
self.destination
@@ -1064,77 +1164,52 @@ def execute(self):
10641164
f"Downloading the annotations of the requested items to {export_path}\nThis might take a while…"
10651165
)
10661166
self.reporter.start_spinner()
1167+
10671168
folders = []
10681169
if self._folder.is_root and self._recursive:
10691170
folders = self._folders.get_all(
10701171
Condition("team_id", self._project.team_id, EQ)
10711172
& Condition("project_id", self._project.id, EQ),
10721173
)
10731174
folders.append(self._folder)
1074-
postfix = self.get_postfix()
1075-
import nest_asyncio
1076-
import platform
1077-
1078-
if platform.system().lower() == "windows":
1079-
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
1080-
1081-
nest_asyncio.apply()
10821175

10831176
if not folders:
1084-
loop = asyncio.new_event_loop()
1085-
if not self._item_names:
1086-
condition = (
1087-
Condition("team_id", self._project.team_id, EQ)
1088-
& Condition("project_id", self._project.id, EQ)
1089-
& Condition("folder_id", self._folder.uuid, EQ)
1090-
)
1091-
item_names = [item.name for item in self._images.get_all(condition)]
1092-
else:
1093-
item_names = self._item_names
1094-
count = loop.run_until_complete(
1095-
self._backend_client.download_annotations(
1096-
team_id=self._project.team_id,
1097-
project_id=self._project.id,
1098-
folder_id=self._folder.uuid,
1099-
items=item_names,
1100-
reporter=self.reporter,
1101-
download_path=f"{export_path}{'/' + self._folder.name if not self._folder.is_root else ''}",
1102-
postfix=postfix,
1103-
callback=self._callback,
1104-
)
1105-
)
1106-
else:
1107-
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
1108-
coroutines = []
1109-
for folder in folders:
1110-
if not self._item_names:
1111-
condition = (
1112-
Condition("team_id", self._project.team_id, EQ)
1113-
& Condition("project_id", self._project.id, EQ)
1114-
& Condition("folder_id", folder.uuid, EQ)
1115-
)
1116-
item_names = [
1117-
item.name for item in self._images.get_all(condition)
1118-
]
1119-
else:
1120-
item_names = self._item_names
1121-
coroutines.append(
1122-
self._backend_client.download_annotations(
1123-
team_id=self._project.team_id,
1124-
project_id=self._project.id,
1125-
folder_id=folder.uuid,
1126-
items=item_names,
1127-
reporter=self.reporter,
1128-
download_path=f"{export_path}{'/' + folder.name if not folder.is_root else ''}", # noqa
1129-
postfix=postfix,
1130-
callback=self._callback,
1131-
)
1177+
folders.append(self._folder)
1178+
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
1179+
futures = []
1180+
for folder in folders:
1181+
if not self._item_names:
1182+
condition = (
1183+
Condition("team_id", self._project.team_id, EQ)
1184+
& Condition("project_id", self._project.id, EQ)
1185+
& Condition("folder_id", folder.uuid, EQ)
11321186
)
1133-
count = sum(
1134-
[i for i in executor.map(self.coroutine_wrapper, coroutines)]
1187+
item_names = [
1188+
item.name for item in self._images.get_all(condition)
1189+
]
1190+
else:
1191+
item_names = self._item_names
1192+
1193+
new_export_path = export_path
1194+
if folder.name != "root":
1195+
new_export_path += f"/{folder.name}"
1196+
1197+
# TODO check
1198+
if not item_names:
1199+
continue
1200+
future = executor.submit(
1201+
self.per_folder_execute,
1202+
item_names,
1203+
folder.uuid,
1204+
new_export_path,
11351205
)
1206+
futures.append(future)
1207+
1208+
for future in concurrent.futures.as_completed(futures):
1209+
print(future.result())
11361210

11371211
self.reporter.stop_spinner()
1212+
count = self.get_items_count(export_path)
11381213
self.reporter.log_info(f"Downloaded annotations for {count} items.")
11391214
self.download_annotation_classes(export_path)
11401215
self._response.data = os.path.abspath(export_path)

src/superannotate/lib/infrastructure/services.py

Lines changed: 99 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import time
99
from contextlib import contextmanager
1010
from functools import lru_cache
11+
from pathlib import Path
1112
from typing import Any
1213
from typing import Callable
1314
from typing import Dict
@@ -18,7 +19,7 @@
1819
from urllib.parse import urljoin
1920

2021
import aiohttp
21-
import lib.core as constance
22+
import lib.core as constants
2223
import requests.packages.urllib3
2324
from lib.core import entities
2425
from lib.core.entities import BaseItemEntity
@@ -55,6 +56,7 @@ class BaseBackendService(SuperannotateServiceProvider):
5556
PAGINATE_BY = 100
5657
LIMIT = 100
5758
MAX_ITEMS_COUNT = 50 * 1000
59+
ASSETS_PROVIDER_VERSION = "v2"
5860

5961
"""
6062
Base service class
@@ -80,9 +82,9 @@ def __init__(
8082

8183
@property
8284
def assets_provider_url(self):
83-
if self.api_url != constance.BACKEND_URL:
84-
return "https://assets-provider.devsuperannotate.com/api/v1.01/"
85-
return "https://assets-provider.superannotate.com/api/v1.01/"
85+
if self.api_url != constants.BACKEND_URL:
86+
return f"https://assets-provider.devsuperannotate.com/api/{self.ASSETS_PROVIDER_VERSION}/"
87+
return f"https://assets-provider.superannotate.com/api/{self.ASSETS_PROVIDER_VERSION}/"
8688

8789
@lru_cache(maxsize=32)
8890
def _get_session(self, thread_id, ttl=None): # noqa
@@ -155,6 +157,7 @@ def _request(
155157
)
156158
prepared = session.prepare_request(req)
157159
response = session.send(request=prepared, verify=self._verify_ssl)
160+
158161
if files:
159162
session.headers.update(self.default_headers)
160163
if response.status_code == 404 and retried < 3:
@@ -309,6 +312,10 @@ class SuperannotateBackendService(BaseBackendService):
309312
URL_START_FILE_SEND_FINISH = "items/{item_id}/annotations/upload/multipart/finish"
310313
URL_START_FILE_SYNC = "items/{item_id}/annotations/sync"
311314
URL_START_FILE_SYNC_STATUS = "items/{item_id}/annotations/sync/status"
315+
URL_CLASSIFY_ITEM_SIZE = "items/annotations/download/method"
316+
URL_SYNC_LARGE_ANNOTATION = "items/{item_id}/annotations/sync"
317+
URL_SYNC_LARGE_ANNOTATION_STATUS = "items/{item_id}/annotations/sync/status"
318+
URL_DOWNLOAD_LARGE_ANNOTATION = "items/{item_id}/annotations/download"
312319

313320
def upload_priority_scores(
314321
self, team_id: int, project_id: int, folder_id: int, priorities: list
@@ -657,7 +664,7 @@ def prepare_export(
657664
prepare_export_url = urljoin(self.api_url, self.URL_PREPARE_EXPORT)
658665

659666
annotation_statuses = ",".join(
660-
[str(constance.AnnotationStatus.get_value(i)) for i in annotation_statuses]
667+
[str(constants.AnnotationStatus.get_value(i)) for i in annotation_statuses]
661668
)
662669

663670
data = {
@@ -1145,7 +1152,76 @@ def get_annotations(
11451152
)
11461153
)
11471154

1148-
async def download_annotations(
1155+
async def download_big_annotation(
1156+
self,
1157+
project_id: int,
1158+
team_id: int,
1159+
folder_id: int,
1160+
reporter: Reporter,
1161+
download_path: str,
1162+
postfix: str,
1163+
item: int,
1164+
callback: Callable = None,
1165+
):
1166+
item_id = item["id"]
1167+
item_name = item["name"]
1168+
query_params = {
1169+
"team_id": team_id,
1170+
"project_id": project_id,
1171+
"annotation_type": "MAIN",
1172+
"version": "V1.00",
1173+
}
1174+
1175+
url = urljoin(
1176+
self.assets_provider_url,
1177+
self.URL_DOWNLOAD_LARGE_ANNOTATION.format(item_id=item_id),
1178+
)
1179+
1180+
sync_params = {
1181+
"team_id": team_id,
1182+
"project_id": project_id,
1183+
"desired_transform_version": "export",
1184+
"desired_version": "V1.00",
1185+
"current_transform_version": "V1.00",
1186+
"current_source": "main",
1187+
"desired_source": "secondary",
1188+
}
1189+
1190+
sync_url = urljoin(
1191+
self.assets_provider_url,
1192+
self.URL_SYNC_LARGE_ANNOTATION.format(item_id=item_id),
1193+
)
1194+
1195+
res = self._request(url=sync_url, params=sync_params, method="POST")
1196+
1197+
sync_params.pop("current_source")
1198+
sync_params.pop("desired_source")
1199+
1200+
synced = False
1201+
1202+
sync_status_url = urljoin(
1203+
self.assets_provider_url,
1204+
self.URL_SYNC_LARGE_ANNOTATION_STATUS.format(item_id=item_id),
1205+
)
1206+
while synced != "SUCCESS":
1207+
synced = self._request(
1208+
url=sync_status_url, params=sync_params, method="GET"
1209+
).json()["status"]
1210+
await asyncio.sleep(1)
1211+
1212+
async with aiohttp.ClientSession(
1213+
connector=aiohttp.TCPConnector(ssl=False),
1214+
headers=self.default_headers,
1215+
) as session:
1216+
start_response = await session.post(url, params=query_params)
1217+
res = await start_response.json()
1218+
Path(download_path).mkdir(exist_ok=True, parents=True)
1219+
1220+
dest_path = Path(download_path) / (item_name + postfix)
1221+
with open(dest_path, "w") as fp:
1222+
json.dump(res, fp)
1223+
1224+
async def download_small_annotations(
11491225
self,
11501226
project_id: int,
11511227
team_id: int,
@@ -1155,7 +1231,7 @@ async def download_annotations(
11551231
postfix: str,
11561232
items: List[str] = None,
11571233
callback: Callable = None,
1158-
) -> int:
1234+
):
11591235

11601236
query_params = {
11611237
"team_id": team_id,
@@ -1503,3 +1579,19 @@ def get_schema(
15031579
},
15041580
content_type=ServiceResponse,
15051581
)
1582+
1583+
def sort_items_by_size(self, item_names, team_id, project_id, folder_id):
1584+
1585+
query_params = {
1586+
"team_id": team_id,
1587+
"project_id": project_id,
1588+
"folder_id": folder_id,
1589+
}
1590+
1591+
url = urljoin(self.assets_provider_url, self.URL_CLASSIFY_ITEM_SIZE)
1592+
1593+
body = {"item_names": item_names, "folder_id": folder_id}
1594+
response = self._request(url=url, method="POST", params=query_params, data=body)
1595+
if not response.ok:
1596+
raise AppException(response.json().get("errors", "Undefined"))
1597+
return response.json()

0 commit comments

Comments
 (0)