Skip to content

Commit 91e6b98

Browse files
committed
refactor quota updates to use aggregation pipeline
replaces the previous multi-step update approach with a single aggregation pipeline that handles both "add" and "set" modes, ensuring atomic quota modifications
1 parent da561e8 commit 91e6b98

File tree

1 file changed

+181
-126
lines changed

1 file changed

+181
-126
lines changed

backend/btrixcloud/orgs.py

Lines changed: 181 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -8,114 +8,112 @@
88
import math
99
import os
1010
import time
11-
12-
from uuid import UUID, uuid4
11+
from calendar import c
1312
from tempfile import NamedTemporaryFile
14-
1513
from typing import (
16-
Awaitable,
17-
Optional,
1814
TYPE_CHECKING,
19-
Dict,
15+
Any,
16+
AsyncGenerator,
17+
Awaitable,
2018
Callable,
19+
Dict,
2120
List,
2221
Literal,
23-
AsyncGenerator,
24-
Any,
22+
Optional,
2523
)
24+
from uuid import UUID, uuid4
2625

26+
import json_stream
27+
from aiostream import stream
28+
from fastapi import APIRouter, Depends, HTTPException, Request
29+
from fastapi.responses import StreamingResponse
2730
from motor.motor_asyncio import AsyncIOMotorDatabase
2831
from pydantic import ValidationError
2932
from pymongo import ReturnDocument
3033
from pymongo.errors import AutoReconnect, DuplicateKeyError
3134

32-
from fastapi import APIRouter, Depends, HTTPException, Request
33-
from fastapi.responses import StreamingResponse
34-
import json_stream
35-
from aiostream import stream
36-
3735
from .models import (
38-
SUCCESSFUL_STATES,
36+
ACTIVE,
37+
MAX_BROWSER_WINDOWS,
38+
MAX_CRAWL_SCALE,
39+
PAUSED_PAYMENT_FAILED,
40+
REASON_PAUSED,
3941
RUNNING_STATES,
42+
SUCCESSFUL_STATES,
4043
WAITING_STATES,
44+
AddedResponse,
45+
AddedResponseId,
46+
AddToOrgRequest,
4147
BaseCrawl,
48+
Collection,
49+
ConfigRevision,
50+
Crawl,
51+
CrawlConfig,
52+
CrawlConfigDefaults,
53+
DeleteCrawlList,
54+
DeletedResponseId,
55+
InvitePending,
56+
InviteToOrgRequest,
57+
OrgAcceptInviteResponse,
4258
Organization,
43-
PlansResponse,
44-
StorageRef,
59+
OrgCreate,
60+
OrgDeleteInviteResponse,
61+
OrgImportResponse,
62+
OrgInviteResponse,
63+
OrgMetrics,
64+
OrgOut,
65+
OrgOutExport,
66+
OrgProxies,
67+
OrgPublicProfileUpdate,
4568
OrgQuotas,
4669
OrgQuotasIn,
4770
OrgQuotaUpdate,
48-
OrgReadOnlyUpdate,
4971
OrgReadOnlyOnCancel,
50-
OrgMetrics,
72+
OrgReadOnlyUpdate,
73+
OrgSlugsResponse,
5174
OrgWebhookUrls,
52-
OrgCreate,
53-
OrgProxies,
54-
Subscription,
55-
SubscriptionUpdate,
56-
SubscriptionCancel,
57-
RenameOrg,
58-
UpdateRole,
59-
RemovePendingInvite,
60-
RemoveFromOrg,
61-
AddToOrgRequest,
62-
InvitePending,
63-
InviteToOrgRequest,
64-
UserRole,
65-
User,
75+
PageWithAllQA,
6676
PaginatedInvitePendingResponse,
6777
PaginatedOrgOutResponse,
68-
CrawlConfig,
69-
Crawl,
70-
CrawlConfigDefaults,
71-
UploadedCrawl,
72-
ConfigRevision,
78+
PlansResponse,
7379
Profile,
74-
Collection,
75-
OrgOut,
76-
OrgOutExport,
77-
PageWithAllQA,
78-
DeleteCrawlList,
79-
PAUSED_PAYMENT_FAILED,
80-
REASON_PAUSED,
81-
ACTIVE,
82-
DeletedResponseId,
83-
UpdatedResponse,
84-
AddedResponse,
85-
AddedResponseId,
86-
SuccessResponseId,
87-
OrgInviteResponse,
88-
OrgAcceptInviteResponse,
89-
OrgDeleteInviteResponse,
9080
RemovedResponse,
91-
OrgSlugsResponse,
92-
OrgImportResponse,
93-
OrgPublicProfileUpdate,
94-
MAX_BROWSER_WINDOWS,
95-
MAX_CRAWL_SCALE,
81+
RemoveFromOrg,
82+
RemovePendingInvite,
83+
RenameOrg,
84+
StorageRef,
85+
Subscription,
86+
SubscriptionCancel,
87+
SubscriptionUpdate,
88+
SuccessResponseId,
89+
UpdatedResponse,
90+
UpdateRole,
91+
UploadedCrawl,
92+
User,
93+
UserRole,
9694
)
9795
from .pagination import DEFAULT_PAGE_SIZE, paginated_format
9896
from .utils import (
99-
dt_now,
100-
slug_from_name,
101-
validate_slug,
102-
get_duplicate_key_error_field,
103-
validate_language_code,
10497
JSONSerializer,
10598
browser_windows_from_scale,
10699
case_insensitive_collation,
100+
dt_now,
101+
get_duplicate_key_error_field,
102+
slug_from_name,
103+
validate_language_code,
104+
validate_slug,
107105
)
108106

109107
if TYPE_CHECKING:
110-
from .invites import InviteOps
108+
from .background_jobs import BackgroundJobOps
111109
from .basecrawls import BaseCrawlOps
112110
from .colls import CollectionOps
111+
from .crawlmanager import CrawlManager
112+
from .file_uploads import FileUploadOps
113+
from .invites import InviteOps
114+
from .pages import PageOps
113115
from .profiles import ProfileOps
114116
from .users import UserManager
115-
from .background_jobs import BackgroundJobOps
116-
from .pages import PageOps
117-
from .file_uploads import FileUploadOps
118-
from .crawlmanager import CrawlManager
119117
else:
120118
InviteOps = BaseCrawlOps = ProfileOps = CollectionOps = object
121119
BackgroundJobOps = UserManager = PageOps = FileUploadOps = CrawlManager = object
@@ -627,74 +625,131 @@ async def update_quotas(
627625

628626
quotas.context = None
629627

630-
previous_extra_mins = (
631-
org.quotas.extraExecMinutes
632-
if (org.quotas and org.quotas.extraExecMinutes)
633-
else 0
634-
)
635-
previous_gifted_mins = (
636-
org.quotas.giftedExecMinutes
637-
if (org.quotas and org.quotas.giftedExecMinutes)
638-
else 0
639-
)
640-
641-
if mode == "add":
642-
increment_update: dict[str, Any] = {
643-
"$inc": {},
628+
update: list[dict[str, Any]] = [
629+
{
630+
"$set": {
631+
"quotaUpdates": {
632+
"$concatArrays": [
633+
"$quotaUpdates",
634+
[{"modified": dt_now(), "update": {}}],
635+
]
636+
},
637+
}
644638
}
639+
]
645640

646-
for field, value in quotas.model_dump(
647-
exclude_unset=True, exclude_defaults=True, exclude_none=True
648-
).items():
649-
if field == "context" or value is None:
650-
continue
651-
inc = max(value, -org.quotas.model_dump().get(field, 0))
652-
increment_update["$inc"][f"quotas.{field}"] = inc
641+
computed_quotas = {}
653642

654-
updated_org = await self.orgs.find_one_and_update(
655-
{"_id": org.id},
656-
increment_update,
657-
projection={"quotas": True},
658-
return_document=ReturnDocument.AFTER,
643+
if mode == "add":
644+
update[0]["$set"]["quotaUpdates"]["$concatArrays"][1][0]["context"] = (
645+
context
659646
)
660-
quotas = OrgQuotasIn(**updated_org["quotas"])
661-
662-
update: dict[str, dict[str, dict[str, Any] | int]] = {
663-
"$push": {
664-
"quotaUpdates": OrgQuotaUpdate(
665-
modified=dt_now(),
666-
update=OrgQuotas(
667-
**quotas.model_dump(
668-
exclude_unset=True, exclude_defaults=True, exclude_none=True
669-
)
670-
),
671-
context=context,
672-
).model_dump()
673-
},
674-
"$inc": {},
675-
"$set": {},
676-
}
677-
678-
if mode == "set":
647+
for field, value in quotas.model_dump().items():
648+
if field == "context":
649+
continue
650+
if value is None:
651+
# set value of field in pushed update to current value in `quotas`
652+
update[0]["$set"]["quotaUpdates"]["$concatArrays"][1][0]["update"][
653+
field
654+
] = f"$quotas.{field}"
655+
computed_quotas[field] = f"$quotas.{field}"
656+
continue
657+
new_value = {
658+
"$add": [
659+
{
660+
"$cond": {
661+
"if": {
662+
"$gt": [
663+
{"$multiply": [f"$quotas.{field}", -1]},
664+
value,
665+
]
666+
},
667+
"then": {"$multiply": [f"$quotas.{field}", -1]},
668+
"else": value,
669+
}
670+
},
671+
f"$quotas.{field}",
672+
]
673+
}
674+
# set value of field in pushed update to current value in quotas + increment
675+
update[0]["$set"][f"quotas.{field}"] = new_value
676+
update[0]["$set"]["quotaUpdates"]["$concatArrays"][1][0]["update"][
677+
field
678+
] = new_value
679+
computed_quotas[field] = new_value
680+
681+
elif mode == "set":
679682
increment_update = quotas.model_dump(
680683
exclude_unset=True, exclude_defaults=True, exclude_none=True
681684
)
682-
update["$set"]["quotas"] = increment_update
685+
for field, value in increment_update.items():
686+
update[0]["$set"][f"quotas.{field}"] = value
687+
computed_quotas[field] = value
688+
update[0]["$set"]["quotaUpdates"]["$concatArrays"][1][0] = OrgQuotaUpdate(
689+
modified=dt_now(),
690+
update=OrgQuotas(
691+
**quotas.model_dump(
692+
exclude_unset=True, exclude_defaults=True, exclude_none=True
693+
)
694+
),
695+
context=context,
696+
).model_dump()
683697

684698
# Inc org available fields for extra/gifted execution time as needed
685-
if quotas.extraExecMinutes is not None:
686-
extra_secs_diff = (quotas.extraExecMinutes - previous_extra_mins) * 60
687-
if org.extraExecSecondsAvailable + extra_secs_diff <= 0:
688-
update["$set"]["extraExecSecondsAvailable"] = 0
689-
else:
690-
update["$inc"]["extraExecSecondsAvailable"] = extra_secs_diff
699+
for extra_or_gifted in ["extra", "gifted"]:
700+
previous_mins = {
701+
"$cond": {
702+
"if": {
703+
"$or": [
704+
{"$ne": [f"$quotas.{extra_or_gifted}ExecMinutes", 0]},
705+
{"$ne": [f"$quotas.{extra_or_gifted}ExecMinutes", None]},
706+
]
707+
},
708+
"then": f"$quotas.{extra_or_gifted}ExecMinutes",
709+
"else": 0,
710+
}
711+
}
691712

692-
if quotas.giftedExecMinutes is not None:
693-
gifted_secs_diff = (quotas.giftedExecMinutes - previous_gifted_mins) * 60
694-
if org.giftedExecSecondsAvailable + gifted_secs_diff <= 0:
695-
update["$set"]["giftedExecSecondsAvailable"] = 0
696-
else:
697-
update["$inc"]["giftedExecSecondsAvailable"] = gifted_secs_diff
713+
secs_diff = {
714+
"$multiply": [
715+
{
716+
"$subtract": [
717+
computed_quotas[f"{extra_or_gifted}ExecMinutes"],
718+
previous_mins,
719+
]
720+
},
721+
60,
722+
]
723+
}
724+
725+
update[0]["$set"][f"{extra_or_gifted}ExecSecondsAvailable"] = {
726+
"$cond": {
727+
"if": {"$ne": [f"${extra_or_gifted}ExecSecondsAvailable", None]},
728+
"then": {
729+
"$cond": {
730+
"if": {
731+
"$lte": [
732+
{
733+
"$add": [
734+
f"${extra_or_gifted}ExecSecondsAvailable",
735+
secs_diff,
736+
]
737+
},
738+
0,
739+
]
740+
},
741+
"then": 0,
742+
"else": {
743+
"$add": [
744+
f"${extra_or_gifted}ExecSecondsAvailable",
745+
secs_diff,
746+
]
747+
},
748+
}
749+
},
750+
"else": f"${extra_or_gifted}ExecSecondsAvailable",
751+
}
752+
}
698753

699754
await self.orgs.find_one_and_update({"_id": org.id}, update)
700755

0 commit comments

Comments
 (0)