Skip to content

Commit 5c9ab1f

Browse files
authored
Merge pull request #638 from superannotateai/assets_provider_update
change assets_provider version
2 parents 8aae029 + e979b38 commit 5c9ab1f

File tree

6 files changed

+94
-70
lines changed

6 files changed

+94
-70
lines changed

src/superannotate/lib/core/serviceproviders.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ async def upload_small_annotations(
367367
self,
368368
project: entities.ProjectEntity,
369369
folder: entities.FolderEntity,
370-
items_name_file_map: Dict[str, io.StringIO],
370+
items_name_data_map: Dict[str, dict],
371371
) -> UploadAnnotationsResponse:
372372
raise NotImplementedError
373373

src/superannotate/lib/core/usecases/annotations.py

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
from lib.core.types import PriorityScoreEntity
4848
from lib.core.usecases.base import BaseReportableUseCase
4949
from lib.core.video_convertor import VideoFrameGenerator
50+
from lib.infrastructure.utils import divide_to_chunks
5051
from pydantic import BaseModel
5152

5253
logger = logging.getLogger("sa")
@@ -119,11 +120,6 @@ def get_or_raise(response: ServiceResponse):
119120
raise AppException(response.error)
120121

121122

122-
def divide_to_chunks(it, size):
123-
it = iter(it)
124-
return iter(lambda: tuple(islice(it, size)), ())
125-
126-
127123
def log_report(
128124
report: Report,
129125
):
@@ -148,7 +144,6 @@ class ItemToUpload(BaseModel):
148144
item: BaseItemEntity
149145
annotation_json: Optional[dict]
150146
path: Optional[str]
151-
file: Optional[io.StringIO]
152147
file_size: Optional[int]
153148
mask: Optional[io.BytesIO]
154149

@@ -186,7 +181,7 @@ async def upload_small_annotations(
186181
report: Report,
187182
callback: Callable = None,
188183
):
189-
async def upload(_chunk):
184+
async def upload(_chunk: List[ItemToUpload]):
190185
failed_annotations, missing_classes, missing_attr_groups, missing_attrs = (
191186
[],
192187
[],
@@ -197,7 +192,7 @@ async def upload(_chunk):
197192
response = await service_provider.annotations.upload_small_annotations(
198193
project=project,
199194
folder=folder,
200-
items_name_file_map={i.item.name: i.file for i in chunk},
195+
items_name_data_map={i.item.name: i.annotation_json for i in chunk},
201196
)
202197
if response.ok:
203198
if response.data.failed_items: # noqa
@@ -221,9 +216,9 @@ async def upload(_chunk):
221216
reporter.update_progress(len(chunk))
222217

223218
_size = 0
224-
chunk = []
219+
chunk: List[ItemToUpload] = []
225220
while True:
226-
item_data = await queue.get()
221+
item_data: ItemToUpload = await queue.get()
227222
queue.task_done()
228223
if not item_data:
229224
queue.put_nowait(None)
@@ -253,11 +248,14 @@ async def upload_big_annotations(
253248
):
254249
async def _upload_big_annotation(item_data: ItemToUpload) -> Tuple[str, bool]:
255250
try:
251+
buff = io.StringIO()
252+
json.dump(item_data.annotation_json, buff, allow_nan=False)
253+
buff.seek(0)
256254
is_uploaded = await service_provider.annotations.upload_big_annotation(
257255
project=project,
258256
folder=folder,
259257
item_id=item_data.item.id,
260-
data=item_data.file,
258+
data=buff,
261259
chunk_size=5 * 1024 * 1024,
262260
)
263261
if is_uploaded and callback:
@@ -335,15 +333,14 @@ async def distribute_queues(self, items_to_upload: List[ItemToUpload]):
335333
for idx, (item_to_upload, processed) in enumerate(data):
336334
if not processed:
337335
try:
338-
item_to_upload.file = io.StringIO()
336+
file = io.StringIO()
339337
json.dump(
340338
item_to_upload.annotation_json,
341-
item_to_upload.file,
339+
file,
342340
allow_nan=False,
343341
)
344-
item_to_upload.file.seek(0, os.SEEK_END)
345-
item_to_upload.file_size = item_to_upload.file.tell()
346-
item_to_upload.file.seek(0)
342+
file.seek(0, os.SEEK_END)
343+
item_to_upload.file_size = file.tell()
347344
while True:
348345
if item_to_upload.file_size > BIG_FILE_THRESHOLD:
349346
if self._big_files_queue.qsize() > 32:
@@ -723,13 +720,10 @@ async def distribute_queues(self, items_to_upload: List[ItemToUpload]):
723720
if not processed:
724721
try:
725722
(
726-
annotation,
723+
item_to_upload.annotation_json,
727724
item_to_upload.mask,
728725
item_to_upload.file_size,
729726
) = await self.get_annotation(item_to_upload.path)
730-
item_to_upload.file = io.StringIO()
731-
json.dump(annotation, item_to_upload.file, allow_nan=False)
732-
item_to_upload.file.seek(0)
733727
while True:
734728
if item_to_upload.file_size > BIG_FILE_THRESHOLD:
735729
if self._big_files_queue.qsize() > 32:
@@ -1018,7 +1012,7 @@ def execute(self):
10181012
self._service_provider.annotations.upload_small_annotations(
10191013
project=self._project,
10201014
folder=self._folder,
1021-
items_name_file_map={self._image.name: annotation_file},
1015+
items_name_data_map={self._image.name: annotation_json},
10221016
)
10231017
)
10241018
if response.ok:

src/superannotate/lib/infrastructure/services/annotation.py

Lines changed: 48 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@
1919
from lib.core.serviceproviders import BaseAnnotationService
2020
from lib.infrastructure.stream_data_handler import StreamedAnnotations
2121
from pydantic import parse_obj_as
22+
from superannotate.lib.infrastructure.services.http_client import AIOHttpSession
2223

2324
logger = logging.getLogger("sa")
2425

2526

2627
class AnnotationService(BaseAnnotationService):
27-
ASSETS_PROVIDER_VERSION = "v2.01"
28+
ASSETS_PROVIDER_VERSION = "v3.01"
2829
DEFAULT_CHUNK_SIZE = 5000
2930

3031
URL_GET_ANNOTATIONS = "items/annotations/download"
@@ -71,13 +72,12 @@ async def _sync_large_annotation(self, team_id, project_id, item_id):
7172
self.assets_provider_url,
7273
self.URL_START_FILE_SYNC.format(item_id=item_id),
7374
)
74-
async with aiohttp.ClientSession(
75+
async with AIOHttpSession(
7576
connector=aiohttp.TCPConnector(ssl=False),
7677
headers=self.client.default_headers,
77-
raise_for_status=True,
7878
) as session:
79-
await session.post(sync_url, params=sync_params)
80-
79+
_response = await session.request("post", sync_url, params=sync_params)
80+
_response.raise_for_status()
8181
sync_params.pop("current_source")
8282
sync_params.pop("desired_source")
8383

@@ -115,12 +115,12 @@ async def get_big_annotation(
115115
team_id=project.team_id, project_id=project.id, item_id=item.id
116116
)
117117

118-
async with aiohttp.ClientSession(
118+
async with AIOHttpSession(
119119
connector=aiohttp.TCPConnector(ssl=False),
120120
headers=self.client.default_headers,
121-
raise_for_status=True,
122121
) as session:
123-
start_response = await session.post(url, params=query_params)
122+
start_response = await session.request("post", url, params=query_params)
123+
start_response.raise_for_status()
124124
large_annotation = await start_response.json()
125125

126126
reporter.update_progress()
@@ -162,8 +162,8 @@ def get_upload_chunks(
162162
response = self.client.request(
163163
url=urljoin(self.assets_provider_url, self.URL_CLASSIFY_ITEM_SIZE),
164164
method="POST",
165-
params={"project_id": project.id, "limit": len(item_ids)},
166-
data={"item_ids": item_ids},
165+
params={"limit": len(item_ids)},
166+
data={"project_id": project.id, "item_ids": item_ids},
167167
)
168168
if not response.ok:
169169
raise AppException(response.error)
@@ -198,12 +198,12 @@ async def download_big_annotation(
198198
team_id=project.team_id, project_id=project.id, item_id=item_id
199199
)
200200

201-
async with aiohttp.ClientSession(
201+
async with AIOHttpSession(
202202
connector=aiohttp.TCPConnector(ssl=False),
203203
headers=self.client.default_headers,
204-
raise_for_status=True,
205204
) as session:
206-
start_response = await session.post(url, params=query_params)
205+
start_response = await session.request("post", url, params=query_params)
206+
start_response.raise_for_status()
207207
res = await start_response.json()
208208
Path(download_path).mkdir(exist_ok=True, parents=True)
209209

@@ -246,40 +246,45 @@ async def upload_small_annotations(
246246
self,
247247
project: entities.ProjectEntity,
248248
folder: entities.FolderEntity,
249-
items_name_file_map: Dict[str, io.StringIO],
249+
items_name_data_map: Dict[str, dict],
250250
) -> UploadAnnotationsResponse:
251251
url = urljoin(
252252
self.assets_provider_url,
253253
(
254-
f"{self.URL_UPLOAD_ANNOTATIONS}?{'&'.join(f'image_names[]={item_name}' for item_name in items_name_file_map.keys())}"
254+
f"{self.URL_UPLOAD_ANNOTATIONS}?{'&'.join(f'image_names[]={item_name}' for item_name in items_name_data_map.keys())}"
255255
),
256256
)
257257

258258
headers = copy.copy(self.client.default_headers)
259259
del headers["Content-Type"]
260-
async with aiohttp.ClientSession(
260+
async with AIOHttpSession(
261261
headers=headers,
262262
connector=aiohttp.TCPConnector(ssl=False),
263-
raise_for_status=True,
264263
) as session:
265-
data = aiohttp.FormData(quote_fields=False)
266-
for key, file in items_name_file_map.items():
267-
file.seek(0)
268-
data.add_field(
264+
form_data = aiohttp.FormData(
265+
quote_fields=False,
266+
)
267+
tmp = {}
268+
for name, data in items_name_data_map.items():
269+
tmp[name] = io.StringIO()
270+
json.dump({"data": data}, tmp[name], allow_nan=False)
271+
tmp[name].seek(0)
272+
273+
for key, data in tmp.items():
274+
form_data.add_field(
269275
key,
270-
bytes(file.read(), "ascii"),
276+
data,
271277
filename=key,
272278
content_type="application/json",
273279
)
274280

275-
_response = await session.post(
276-
url,
277-
params={
278-
"team_id": project.team_id,
279-
"project_id": project.id,
280-
"folder_id": folder.id,
281-
},
282-
data=data,
281+
params = {
282+
"team_id": project.team_id,
283+
"project_id": project.id,
284+
"folder_id": folder.id,
285+
}
286+
_response = await session.request(
287+
"post", url, params=params, data=form_data
283288
)
284289
if not _response.ok:
285290
logger.debug(f"Status code {str(_response.status)}")
@@ -301,23 +306,20 @@ async def upload_big_annotation(
301306
data: io.StringIO,
302307
chunk_size: int,
303308
) -> bool:
304-
async with aiohttp.ClientSession(
309+
async with AIOHttpSession(
305310
connector=aiohttp.TCPConnector(ssl=False),
306311
headers=self.client.default_headers,
307-
raise_for_status=True,
308312
) as session:
309313
params = {
310314
"team_id": project.team_id,
311315
"project_id": project.id,
312316
"folder_id": folder.id,
313317
}
314-
start_response = await session.post(
315-
urljoin(
316-
self.assets_provider_url,
317-
self.URL_START_FILE_UPLOAD_PROCESS.format(item_id=item_id),
318-
),
319-
params=params,
318+
url = urljoin(
319+
self.assets_provider_url,
320+
self.URL_START_FILE_UPLOAD_PROCESS.format(item_id=item_id),
320321
)
322+
start_response = await session.request("post", url, params=params)
321323
if not start_response.ok:
322324
raise AppException(str(await start_response.text()))
323325
process_info = await start_response.json()
@@ -331,7 +333,8 @@ async def upload_big_annotation(
331333
params["chunk_id"] = chunk_id
332334
if chunk:
333335
data_sent = True
334-
response = await session.post(
336+
response = await session.request(
337+
"post",
335338
urljoin(
336339
self.assets_provider_url,
337340
self.URL_START_FILE_SEND_PART.format(item_id=item_id),
@@ -348,7 +351,8 @@ async def upload_big_annotation(
348351
if len(chunk) < chunk_size:
349352
break
350353
del params["chunk_id"]
351-
response = await session.post(
354+
response = await session.request(
355+
"post",
352356
urljoin(
353357
self.assets_provider_url,
354358
self.URL_START_FILE_SEND_FINISH.format(item_id=item_id),
@@ -359,7 +363,8 @@ async def upload_big_annotation(
359363
if not response.ok:
360364
raise AppException(str(await response.text()))
361365
del params["path"]
362-
response = await session.post(
366+
response = await session.request(
367+
"post",
363368
urljoin(
364369
self.assets_provider_url,
365370
self.URL_START_FILE_SYNC.format(item_id=item_id),
@@ -370,7 +375,8 @@ async def upload_big_annotation(
370375
if not response.ok:
371376
raise AppException(str(await response.text()))
372377
while True:
373-
response = await session.get(
378+
response = await session.request(
379+
"get",
374380
urljoin(
375381
self.assets_provider_url,
376382
self.URL_START_FILE_SYNC_STATUS.format(item_id=item_id),

src/superannotate/lib/infrastructure/services/http_client.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import json
23
import logging
34
import platform
@@ -10,6 +11,7 @@
1011
from typing import Dict
1112
from typing import List
1213

14+
import aiohttp
1315
import pydantic
1416
import requests
1517
from lib.core.exceptions import AppException
@@ -213,3 +215,20 @@ def serialize_response(
213215
data["res_error"] = response.content
214216
data["reason"] = response.reason
215217
return content_type(**data)
218+
219+
220+
class AIOHttpSession(aiohttp.ClientSession):
221+
RETRY_STATUS_CODES = [401, 403, 502, 503, 504]
222+
RETRY_LIMIT = 3
223+
BACKOFF_FACTOR = 0.3
224+
225+
async def request(self, *args, **kwargs) -> aiohttp.ClientResponse:
226+
attempts = self.RETRY_LIMIT
227+
delay = 0
228+
for _ in range(attempts):
229+
delay += self.BACKOFF_FACTOR
230+
attempts -= 1
231+
response = await super()._request(*args, **kwargs)
232+
if response.status not in self.RETRY_STATUS_CODES or not attempts:
233+
return response
234+
await asyncio.sleep(delay)

0 commit comments

Comments
 (0)