Skip to content

Commit aec0398

Browse files
committed
refactor quota updates to use aggregation pipeline
replaces the previous multi-step update approach with a single aggregation pipeline that handles both "add" and "set" modes, ensuring atomic quota modifications
1 parent 9fa5509 commit aec0398

File tree

1 file changed

+180
-125
lines changed

1 file changed

+180
-125
lines changed

backend/btrixcloud/orgs.py

Lines changed: 180 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -8,114 +8,112 @@
88
import math
99
import os
1010
import time
11-
12-
from uuid import UUID, uuid4
11+
from calendar import c
1312
from tempfile import NamedTemporaryFile
14-
1513
from typing import (
16-
Awaitable,
17-
Optional,
1814
TYPE_CHECKING,
19-
Dict,
15+
Any,
16+
AsyncGenerator,
17+
Awaitable,
2018
Callable,
19+
Dict,
2120
List,
2221
Literal,
23-
AsyncGenerator,
24-
Any,
22+
Optional,
2523
)
24+
from uuid import UUID, uuid4
2625

26+
import json_stream
27+
from aiostream import stream
28+
from fastapi import APIRouter, Depends, HTTPException, Request
29+
from fastapi.responses import StreamingResponse
2730
from motor.motor_asyncio import AsyncIOMotorDatabase
2831
from pydantic import ValidationError
2932
from pymongo import ReturnDocument
3033
from pymongo.collation import Collation
3134
from pymongo.errors import AutoReconnect, DuplicateKeyError
3235

33-
from fastapi import APIRouter, Depends, HTTPException, Request
34-
from fastapi.responses import StreamingResponse
35-
import json_stream
36-
from aiostream import stream
37-
3836
from .models import (
39-
SUCCESSFUL_STATES,
37+
ACTIVE,
38+
MAX_BROWSER_WINDOWS,
39+
MAX_CRAWL_SCALE,
40+
PAUSED_PAYMENT_FAILED,
41+
REASON_PAUSED,
4042
RUNNING_STATES,
43+
SUCCESSFUL_STATES,
4144
WAITING_STATES,
45+
AddedResponse,
46+
AddedResponseId,
47+
AddToOrgRequest,
4248
BaseCrawl,
49+
Collection,
50+
ConfigRevision,
51+
Crawl,
52+
CrawlConfig,
53+
CrawlConfigDefaults,
54+
DeleteCrawlList,
55+
DeletedResponseId,
56+
InvitePending,
57+
InviteToOrgRequest,
58+
OrgAcceptInviteResponse,
4359
Organization,
44-
PlansResponse,
45-
StorageRef,
60+
OrgCreate,
61+
OrgDeleteInviteResponse,
62+
OrgImportResponse,
63+
OrgInviteResponse,
64+
OrgMetrics,
65+
OrgOut,
66+
OrgOutExport,
67+
OrgProxies,
68+
OrgPublicProfileUpdate,
4669
OrgQuotas,
4770
OrgQuotasIn,
4871
OrgQuotaUpdate,
49-
OrgReadOnlyUpdate,
5072
OrgReadOnlyOnCancel,
51-
OrgMetrics,
73+
OrgReadOnlyUpdate,
74+
OrgSlugsResponse,
5275
OrgWebhookUrls,
53-
OrgCreate,
54-
OrgProxies,
55-
Subscription,
56-
SubscriptionUpdate,
57-
SubscriptionCancel,
58-
RenameOrg,
59-
UpdateRole,
60-
RemovePendingInvite,
61-
RemoveFromOrg,
62-
AddToOrgRequest,
63-
InvitePending,
64-
InviteToOrgRequest,
65-
UserRole,
66-
User,
76+
PageWithAllQA,
6777
PaginatedInvitePendingResponse,
6878
PaginatedOrgOutResponse,
69-
CrawlConfig,
70-
Crawl,
71-
CrawlConfigDefaults,
72-
UploadedCrawl,
73-
ConfigRevision,
79+
PlansResponse,
7480
Profile,
75-
Collection,
76-
OrgOut,
77-
OrgOutExport,
78-
PageWithAllQA,
79-
DeleteCrawlList,
80-
PAUSED_PAYMENT_FAILED,
81-
REASON_PAUSED,
82-
ACTIVE,
83-
DeletedResponseId,
84-
UpdatedResponse,
85-
AddedResponse,
86-
AddedResponseId,
87-
SuccessResponseId,
88-
OrgInviteResponse,
89-
OrgAcceptInviteResponse,
90-
OrgDeleteInviteResponse,
9181
RemovedResponse,
92-
OrgSlugsResponse,
93-
OrgImportResponse,
94-
OrgPublicProfileUpdate,
95-
MAX_BROWSER_WINDOWS,
96-
MAX_CRAWL_SCALE,
82+
RemoveFromOrg,
83+
RemovePendingInvite,
84+
RenameOrg,
85+
StorageRef,
86+
Subscription,
87+
SubscriptionCancel,
88+
SubscriptionUpdate,
89+
SuccessResponseId,
90+
UpdatedResponse,
91+
UpdateRole,
92+
UploadedCrawl,
93+
User,
94+
UserRole,
9795
)
9896
from .pagination import DEFAULT_PAGE_SIZE, paginated_format
9997
from .utils import (
98+
JSONSerializer,
99+
browser_windows_from_scale,
100100
dt_now,
101-
slug_from_name,
102-
validate_slug,
103101
get_duplicate_key_error_field,
102+
slug_from_name,
104103
validate_language_code,
105-
JSONSerializer,
106-
browser_windows_from_scale,
104+
validate_slug,
107105
)
108106

109107
if TYPE_CHECKING:
110-
from .invites import InviteOps
108+
from .background_jobs import BackgroundJobOps
111109
from .basecrawls import BaseCrawlOps
112110
from .colls import CollectionOps
111+
from .crawlmanager import CrawlManager
112+
from .file_uploads import FileUploadOps
113+
from .invites import InviteOps
114+
from .pages import PageOps
113115
from .profiles import ProfileOps
114116
from .users import UserManager
115-
from .background_jobs import BackgroundJobOps
116-
from .pages import PageOps
117-
from .file_uploads import FileUploadOps
118-
from .crawlmanager import CrawlManager
119117
else:
120118
InviteOps = BaseCrawlOps = ProfileOps = CollectionOps = object
121119
BackgroundJobOps = UserManager = PageOps = FileUploadOps = CrawlManager = object
@@ -628,74 +626,131 @@ async def update_quotas(
628626

629627
quotas.context = None
630628

631-
previous_extra_mins = (
632-
org.quotas.extraExecMinutes
633-
if (org.quotas and org.quotas.extraExecMinutes)
634-
else 0
635-
)
636-
previous_gifted_mins = (
637-
org.quotas.giftedExecMinutes
638-
if (org.quotas and org.quotas.giftedExecMinutes)
639-
else 0
640-
)
641-
642-
if mode == "add":
643-
increment_update: dict[str, Any] = {
644-
"$inc": {},
629+
update: list[dict[str, Any]] = [
630+
{
631+
"$set": {
632+
"quotaUpdates": {
633+
"$concatArrays": [
634+
"$quotaUpdates",
635+
[{"modified": dt_now(), "update": {}}],
636+
]
637+
},
638+
}
645639
}
640+
]
646641

647-
for field, value in quotas.model_dump(
648-
exclude_unset=True, exclude_defaults=True, exclude_none=True
649-
).items():
650-
if field == "context" or value is None:
651-
continue
652-
inc = max(value, -org.quotas.model_dump().get(field, 0))
653-
increment_update["$inc"][f"quotas.{field}"] = inc
642+
computed_quotas = {}
654643

655-
updated_org = await self.orgs.find_one_and_update(
656-
{"_id": org.id},
657-
increment_update,
658-
projection={"quotas": True},
659-
return_document=ReturnDocument.AFTER,
644+
if mode == "add":
645+
update[0]["$set"]["quotaUpdates"]["$concatArrays"][1][0]["context"] = (
646+
context
660647
)
661-
quotas = OrgQuotasIn(**updated_org["quotas"])
662-
663-
update: dict[str, dict[str, dict[str, Any] | int]] = {
664-
"$push": {
665-
"quotaUpdates": OrgQuotaUpdate(
666-
modified=dt_now(),
667-
update=OrgQuotas(
668-
**quotas.model_dump(
669-
exclude_unset=True, exclude_defaults=True, exclude_none=True
670-
)
671-
),
672-
context=context,
673-
).model_dump()
674-
},
675-
"$inc": {},
676-
"$set": {},
677-
}
678-
679-
if mode == "set":
648+
for field, value in quotas.model_dump().items():
649+
if field == "context":
650+
continue
651+
if value is None:
652+
# set value of field in pushed update to current value in `quotas`
653+
update[0]["$set"]["quotaUpdates"]["$concatArrays"][1][0]["update"][
654+
field
655+
] = f"$quotas.{field}"
656+
computed_quotas[field] = f"$quotas.{field}"
657+
continue
658+
new_value = {
659+
"$add": [
660+
{
661+
"$cond": {
662+
"if": {
663+
"$gt": [
664+
{"$multiply": [f"$quotas.{field}", -1]},
665+
value,
666+
]
667+
},
668+
"then": {"$multiply": [f"$quotas.{field}", -1]},
669+
"else": value,
670+
}
671+
},
672+
f"$quotas.{field}",
673+
]
674+
}
675+
# set value of field in pushed update to current value in quotas + increment
676+
update[0]["$set"][f"quotas.{field}"] = new_value
677+
update[0]["$set"]["quotaUpdates"]["$concatArrays"][1][0]["update"][
678+
field
679+
] = new_value
680+
computed_quotas[field] = new_value
681+
682+
elif mode == "set":
680683
increment_update = quotas.model_dump(
681684
exclude_unset=True, exclude_defaults=True, exclude_none=True
682685
)
683-
update["$set"]["quotas"] = increment_update
686+
for field, value in increment_update.items():
687+
update[0]["$set"][f"quotas.{field}"] = value
688+
computed_quotas[field] = value
689+
update[0]["$set"]["quotaUpdates"]["$concatArrays"][1][0] = OrgQuotaUpdate(
690+
modified=dt_now(),
691+
update=OrgQuotas(
692+
**quotas.model_dump(
693+
exclude_unset=True, exclude_defaults=True, exclude_none=True
694+
)
695+
),
696+
context=context,
697+
).model_dump()
684698

685699
# Inc org available fields for extra/gifted execution time as needed
686-
if quotas.extraExecMinutes is not None:
687-
extra_secs_diff = (quotas.extraExecMinutes - previous_extra_mins) * 60
688-
if org.extraExecSecondsAvailable + extra_secs_diff <= 0:
689-
update["$set"]["extraExecSecondsAvailable"] = 0
690-
else:
691-
update["$inc"]["extraExecSecondsAvailable"] = extra_secs_diff
700+
for extra_or_gifted in ["extra", "gifted"]:
701+
previous_mins = {
702+
"$cond": {
703+
"if": {
704+
"$or": [
705+
{"$ne": [f"$quotas.{extra_or_gifted}ExecMinutes", 0]},
706+
{"$ne": [f"$quotas.{extra_or_gifted}ExecMinutes", None]},
707+
]
708+
},
709+
"then": f"$quotas.{extra_or_gifted}ExecMinutes",
710+
"else": 0,
711+
}
712+
}
692713

693-
if quotas.giftedExecMinutes is not None:
694-
gifted_secs_diff = (quotas.giftedExecMinutes - previous_gifted_mins) * 60
695-
if org.giftedExecSecondsAvailable + gifted_secs_diff <= 0:
696-
update["$set"]["giftedExecSecondsAvailable"] = 0
697-
else:
698-
update["$inc"]["giftedExecSecondsAvailable"] = gifted_secs_diff
714+
secs_diff = {
715+
"$multiply": [
716+
{
717+
"$subtract": [
718+
computed_quotas[f"{extra_or_gifted}ExecMinutes"],
719+
previous_mins,
720+
]
721+
},
722+
60,
723+
]
724+
}
725+
726+
update[0]["$set"][f"{extra_or_gifted}ExecSecondsAvailable"] = {
727+
"$cond": {
728+
"if": {"$ne": [f"${extra_or_gifted}ExecSecondsAvailable", None]},
729+
"then": {
730+
"$cond": {
731+
"if": {
732+
"$lte": [
733+
{
734+
"$add": [
735+
f"${extra_or_gifted}ExecSecondsAvailable",
736+
secs_diff,
737+
]
738+
},
739+
0,
740+
]
741+
},
742+
"then": 0,
743+
"else": {
744+
"$add": [
745+
f"${extra_or_gifted}ExecSecondsAvailable",
746+
secs_diff,
747+
]
748+
},
749+
}
750+
},
751+
"else": f"${extra_or_gifted}ExecSecondsAvailable",
752+
}
753+
}
699754

700755
await self.orgs.find_one_and_update({"_id": org.id}, update)
701756

0 commit comments

Comments
 (0)