Skip to content

Commit ba23b00

Browse files
committed
fix(storage): decrement KB document storage atomically with deletion
Address Cursor review: hardDeleteDocuments deleted the rows then decremented best-effort, so a decrement failure left billed storage inflated with no row to reconcile. Resolve each owner's subscription up front, then decrement inside the same transaction that deletes the embeddings/documents (decrementStorageUsageInTx, also now shared by releaseDeletedFileStorage), so the counter and the content commit or roll back together. Connector docs remain excluded.
1 parent 9d9186e commit ba23b00

3 files changed

Lines changed: 58 additions & 29 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
export { checkStorageQuota, getUserStorageLimit, getUserStorageUsage } from './limits'
22
export {
33
decrementStorageUsage,
4+
decrementStorageUsageInTx,
45
incrementStorageUsage,
56
releaseDeletedFileStorage,
67
} from './tracking'

apps/sim/lib/billing/storage/tracking.ts

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,36 @@ export async function decrementStorageUsage(
166166
}
167167
}
168168

169+
type StorageTransaction = Parameters<Parameters<typeof db.transaction>[0]>[0]
170+
171+
/**
172+
* Decrement a user's (or their org's) storage counter inside an existing
173+
* transaction, using a pre-resolved subscription. This lets a caller make the
174+
* counter update atomic with the DB rows it is deleting (e.g. hard-deleting
175+
* documents), so a failure of either rolls back both — no inflated counter, no
176+
* over-decrement. The caller resolves the subscription (a read) before opening
177+
* the transaction.
178+
*/
179+
export async function decrementStorageUsageInTx(
180+
tx: StorageTransaction,
181+
sub: HighestPrioritySubscription | null,
182+
userId: string,
183+
bytes: number
184+
): Promise<void> {
185+
if (!isBillingEnabled || bytes <= 0) return
186+
if (isOrgScopedSubscription(sub, userId) && sub) {
187+
await tx
188+
.update(organization)
189+
.set({ storageUsedBytes: sql`GREATEST(0, ${organization.storageUsedBytes} - ${bytes})` })
190+
.where(eq(organization.id, sub.referenceId))
191+
} else {
192+
await tx
193+
.update(userStats)
194+
.set({ storageUsedBytes: sql`GREATEST(0, ${userStats.storageUsedBytes} - ${bytes})` })
195+
.where(eq(userStats.userId, userId))
196+
}
197+
}
198+
169199
/**
170200
* Atomically soft-delete a file's metadata row and decrement the owner's storage
171201
* counter in a single transaction.
@@ -193,7 +223,6 @@ export async function releaseDeletedFileStorage(
193223

194224
const { getHighestPrioritySubscription } = await import('@/lib/billing/core/subscription')
195225
const sub = await getHighestPrioritySubscription(userId)
196-
const orgScoped = isOrgScopedSubscription(sub, userId) && sub !== null
197226

198227
let claimed = false
199228
await db.transaction(async (tx) => {
@@ -204,18 +233,7 @@ export async function releaseDeletedFileStorage(
204233
.returning({ id: workspaceFiles.id })
205234
if (claimedRows.length === 0) return
206235
claimed = true
207-
208-
if (orgScoped && sub) {
209-
await tx
210-
.update(organization)
211-
.set({ storageUsedBytes: sql`GREATEST(0, ${organization.storageUsedBytes} - ${bytes})` })
212-
.where(eq(organization.id, sub.referenceId))
213-
} else {
214-
await tx
215-
.update(userStats)
216-
.set({ storageUsedBytes: sql`GREATEST(0, ${userStats.storageUsedBytes} - ${bytes})` })
217-
.where(eq(userStats.userId, userId))
218-
}
236+
await decrementStorageUsageInTx(tx, sub, userId, bytes)
219237
})
220238

221239
if (claimed && workspaceId) {

apps/sim/lib/knowledge/documents/service.ts

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@ import { generateId } from '@sim/utils/id'
1414
import { tasks } from '@trigger.dev/sdk'
1515
import { and, asc, desc, eq, inArray, isNotNull, isNull, type SQL, sql } from 'drizzle-orm'
1616
import { checkActorUsageLimits } from '@/lib/billing/calculations/usage-monitor'
17+
import type { HighestPrioritySubscription } from '@/lib/billing/core/plan'
18+
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
1719
import { recordUsage } from '@/lib/billing/core/usage-log'
1820
import {
1921
checkStorageQuota,
20-
decrementStorageUsage,
22+
decrementStorageUsageInTx,
2123
incrementStorageUsage,
2224
} from '@/lib/billing/storage'
2325
import { checkAndBillOverageThreshold } from '@/lib/billing/threshold-billing'
@@ -2051,28 +2053,36 @@ export async function hardDeleteDocuments(
20512053

20522054
const existingIds = documentsToDelete.map((doc) => doc.id)
20532055

2056+
// Sum the billable bytes per owning user. Connector-synced documents are never
2057+
// metered at ingest (the sync engine inserts them directly), so they must not
2058+
// be decremented here.
2059+
const bytesByUser = new Map<string, number>()
2060+
for (const doc of documentsToDelete) {
2061+
if (doc.connectorId || doc.fileSize <= 0) continue
2062+
const billedUserId = doc.uploadedBy ?? doc.kbUserId
2063+
if (!billedUserId) continue
2064+
bytesByUser.set(billedUserId, (bytesByUser.get(billedUserId) ?? 0) + doc.fileSize)
2065+
}
2066+
2067+
// Resolve each owner's subscription before the transaction (these are reads),
2068+
// then decrement the counters inside the same transaction that deletes the
2069+
// rows, so a failure of either rolls both back — the billed storage can never
2070+
// be left inflated once the content is gone.
2071+
const subByUser = new Map<string, HighestPrioritySubscription | null>()
2072+
for (const billedUserId of bytesByUser.keys()) {
2073+
subByUser.set(billedUserId, await getHighestPrioritySubscription(billedUserId))
2074+
}
2075+
20542076
await db.transaction(async (tx) => {
20552077
await tx.delete(embedding).where(inArray(embedding.documentId, existingIds))
20562078
await tx.delete(document).where(inArray(document.id, existingIds))
2079+
for (const [billedUserId, bytes] of bytesByUser) {
2080+
await decrementStorageUsageInTx(tx, subByUser.get(billedUserId) ?? null, billedUserId, bytes)
2081+
}
20572082
})
20582083

20592084
await deleteDocumentStorageFiles(documentsToDelete, requestId)
20602085

2061-
for (const doc of documentsToDelete) {
2062-
// Connector-synced documents are never metered at ingest (the sync engine
2063-
// inserts them directly), so they must not be decremented here — doing so
2064-
// would erode legitimately-counted bytes in the same owner counter.
2065-
if (doc.connectorId) continue
2066-
const billedUserId = doc.uploadedBy ?? doc.kbUserId
2067-
if (billedUserId && doc.fileSize > 0) {
2068-
try {
2069-
await decrementStorageUsage(billedUserId, doc.fileSize, doc.workspaceId ?? undefined)
2070-
} catch (storageError) {
2071-
logger.error(`[${requestId}] Failed to update storage tracking:`, storageError)
2072-
}
2073-
}
2074-
}
2075-
20762086
logger.info(`[${requestId}] Hard deleted ${existingIds.length} documents`, {
20772087
documentIds: existingIds,
20782088
})

0 commit comments

Comments
 (0)