Skip to content

Commit 4295a5c

Browse files
TheodoreSpeakswaleedlatif1Sg312icecrasher321
authored
improvement(db): add session statement/lock timeouts; simplify KB doc tx (#4593)
* v0.6.29: login improvements, posthog telemetry (#4026) * feat(posthog): Add tracking on mothership abort (#4023) Co-authored-by: Theodore Li <theo@sim.ai> * fix(login): fix captcha headers for manual login (#4025) * fix(signup): fix turnstile key loading * fix(login): fix captcha header passing * Catch user already exists, remove login form captcha * improvement(db): add session statement/lock timeouts; simplify KB doc tx * fix(knowledge): close soft-delete TOCTOU on KB document insert Fix the race the bots flagged: KB delete is soft (`deletedAt = now`) so the FK can't catch a concurrent KB delete between the existence check and the document insert. - Add `insertDocumentsIfKbAlive` helper that gates the insert on `EXISTS(SELECT 1 FROM knowledge_base WHERE id=$kb AND deleted_at IS NULL)` in the same statement via INSERT...SELECT...WHERE EXISTS. Atomic at the MVCC snapshot — no transaction, no row lock. - Use jsonb_to_recordset to declare column types once, avoiding per-param casts for nullable columns. - Wire into both `createDocumentRecords` (bulk) and `createSingleDocument`. - Keep the upfront KB existence check as a fast-path early-out for the common case; the atomic insert is the race guard. --------- Co-authored-by: Waleed <walif6@gmail.com> Co-authored-by: Siddharth Ganesan <33737564+Sg312@users.noreply.github.com> Co-authored-by: Vikhyath Mondreti <vikhyathvikku@gmail.com>
1 parent c3ac54e commit 4295a5c

4 files changed

Lines changed: 229 additions & 100 deletions

File tree

apps/sim/lib/knowledge/documents/service.ts

Lines changed: 213 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -748,6 +748,126 @@ async function processDocumentsWithTrigger(
748748
}
749749
}
750750

751+
interface NewDocumentRow {
752+
id: string
753+
knowledgeBaseId: string
754+
filename: string
755+
fileUrl: string
756+
fileSize: number
757+
mimeType: string
758+
chunkCount: number
759+
tokenCount: number
760+
characterCount: number
761+
processingStatus: 'pending'
762+
enabled: boolean
763+
uploadedAt: Date
764+
tag1: string | null
765+
tag2: string | null
766+
tag3: string | null
767+
tag4: string | null
768+
tag5: string | null
769+
tag6: string | null
770+
tag7: string | null
771+
number1: number | null
772+
number2: number | null
773+
number3: number | null
774+
number4: number | null
775+
number5: number | null
776+
date1: Date | null
777+
date2: Date | null
778+
boolean1: boolean | null
779+
boolean2: boolean | null
780+
boolean3: boolean | null
781+
}
782+
783+
/**
784+
* Insert N document rows IF the parent knowledge base is still alive
785+
* (`deleted_at IS NULL`) at the statement's MVCC snapshot. Returns the
786+
* number of rows actually inserted.
787+
*
788+
* Knowledge bases are soft-deleted, so a normal FK can't catch a concurrent
789+
* delete — the KB row physically remains. We do the existence check and the
790+
* insert in a single statement via INSERT...SELECT...WHERE EXISTS, which
791+
* Postgres evaluates atomically. No transaction or row lock required, no
792+
* race window between check and insert.
793+
*
794+
* Returns 0 if the KB was soft-deleted; caller throws.
795+
*/
796+
async function insertDocumentsIfKbAlive(
797+
rows: NewDocumentRow[],
798+
knowledgeBaseId: string
799+
): Promise<number> {
800+
if (rows.length === 0) return 0
801+
802+
// jsonb_to_recordset declares the column types once, so we don't need to
803+
// cast every parameter individually to keep Postgres' type inference happy
804+
// when nullable columns end up all-NULL across the batch.
805+
const jsonRows = rows.map((d) => ({
806+
id: d.id,
807+
knowledge_base_id: d.knowledgeBaseId,
808+
filename: d.filename,
809+
file_url: d.fileUrl,
810+
file_size: d.fileSize,
811+
mime_type: d.mimeType,
812+
chunk_count: d.chunkCount,
813+
token_count: d.tokenCount,
814+
character_count: d.characterCount,
815+
processing_status: d.processingStatus,
816+
enabled: d.enabled,
817+
uploaded_at: d.uploadedAt.toISOString(),
818+
tag1: d.tag1,
819+
tag2: d.tag2,
820+
tag3: d.tag3,
821+
tag4: d.tag4,
822+
tag5: d.tag5,
823+
tag6: d.tag6,
824+
tag7: d.tag7,
825+
number1: d.number1,
826+
number2: d.number2,
827+
number3: d.number3,
828+
number4: d.number4,
829+
number5: d.number5,
830+
date1: d.date1?.toISOString() ?? null,
831+
date2: d.date2?.toISOString() ?? null,
832+
boolean1: d.boolean1,
833+
boolean2: d.boolean2,
834+
boolean3: d.boolean3,
835+
}))
836+
837+
const result = await db.execute(sql`
838+
INSERT INTO document (
839+
id, knowledge_base_id, filename, file_url, file_size, mime_type,
840+
chunk_count, token_count, character_count, processing_status, enabled, uploaded_at,
841+
tag1, tag2, tag3, tag4, tag5, tag6, tag7,
842+
number1, number2, number3, number4, number5,
843+
date1, date2,
844+
boolean1, boolean2, boolean3
845+
)
846+
SELECT
847+
id, knowledge_base_id, filename, file_url, file_size, mime_type,
848+
chunk_count, token_count, character_count, processing_status, enabled, uploaded_at,
849+
tag1, tag2, tag3, tag4, tag5, tag6, tag7,
850+
number1, number2, number3, number4, number5,
851+
date1, date2,
852+
boolean1, boolean2, boolean3
853+
FROM jsonb_to_recordset(${JSON.stringify(jsonRows)}::jsonb) AS x(
854+
id text, knowledge_base_id text, filename text, file_url text, file_size integer, mime_type text,
855+
chunk_count integer, token_count integer, character_count integer, processing_status text, enabled boolean, uploaded_at timestamp,
856+
tag1 text, tag2 text, tag3 text, tag4 text, tag5 text, tag6 text, tag7 text,
857+
number1 double precision, number2 double precision, number3 double precision, number4 double precision, number5 double precision,
858+
date1 timestamp, date2 timestamp,
859+
boolean1 boolean, boolean2 boolean, boolean3 boolean
860+
)
861+
WHERE EXISTS (
862+
SELECT 1 FROM knowledge_base
863+
WHERE id = ${knowledgeBaseId} AND deleted_at IS NULL
864+
)
865+
RETURNING id
866+
`)
867+
868+
return Array.from(result).length
869+
}
870+
751871
export async function createDocumentRecords(
752872
documents: Array<{
753873
filename: string
@@ -766,99 +886,102 @@ export async function createDocumentRecords(
766886
knowledgeBaseId: string,
767887
requestId: string
768888
): Promise<DocumentData[]> {
769-
return await db.transaction(async (tx) => {
770-
await tx.execute(sql`SELECT 1 FROM knowledge_base WHERE id = ${knowledgeBaseId} FOR UPDATE`)
771-
772-
const kb = await tx
773-
.select({ id: knowledgeBase.id })
774-
.from(knowledgeBase)
775-
.where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt)))
776-
.limit(1)
889+
// Cheap upfront existence check so the common KB-not-found path fails fast
890+
// before we burn CPU on tag processing. The atomic insert below is the
891+
// race-safe guard against a concurrent KB soft-delete in the small window
892+
// between this check and the insert.
893+
const kb = await db
894+
.select({ id: knowledgeBase.id })
895+
.from(knowledgeBase)
896+
.where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt)))
897+
.limit(1)
777898

778-
if (kb.length === 0) {
779-
throw new Error('Knowledge base not found')
780-
}
899+
if (kb.length === 0) {
900+
throw new Error('Knowledge base not found')
901+
}
781902

782-
const now = new Date()
783-
const documentRecords = []
784-
const returnData: DocumentData[] = []
903+
const now = new Date()
904+
const documentRecords: NewDocumentRow[] = []
905+
const returnData: DocumentData[] = []
785906

786-
for (const docData of documents) {
787-
const documentId = generateId()
907+
for (const docData of documents) {
908+
const documentId = generateId()
788909

789-
let processedTags: Partial<ProcessedDocumentTags> = {}
910+
let processedTags: Partial<ProcessedDocumentTags> = {}
790911

791-
if (docData.documentTagsData) {
792-
try {
793-
const tagData = JSON.parse(docData.documentTagsData)
794-
if (Array.isArray(tagData)) {
795-
processedTags = await processDocumentTags(knowledgeBaseId, tagData, requestId)
796-
}
797-
} catch (error) {
798-
if (error instanceof SyntaxError) {
799-
logger.warn(`[${requestId}] Failed to parse documentTagsData for bulk document:`, error)
800-
} else {
801-
throw error
802-
}
912+
if (docData.documentTagsData) {
913+
try {
914+
const tagData = JSON.parse(docData.documentTagsData)
915+
if (Array.isArray(tagData)) {
916+
processedTags = await processDocumentTags(knowledgeBaseId, tagData, requestId)
917+
}
918+
} catch (error) {
919+
if (error instanceof SyntaxError) {
920+
logger.warn(`[${requestId}] Failed to parse documentTagsData for bulk document:`, error)
921+
} else {
922+
throw error
803923
}
804924
}
925+
}
805926

806-
const newDocument = {
807-
id: documentId,
808-
knowledgeBaseId,
809-
filename: docData.filename,
810-
fileUrl: docData.fileUrl,
811-
fileSize: docData.fileSize,
812-
mimeType: docData.mimeType,
813-
chunkCount: 0,
814-
tokenCount: 0,
815-
characterCount: 0,
816-
processingStatus: 'pending' as const,
817-
enabled: true,
818-
uploadedAt: now,
819-
tag1: processedTags.tag1 ?? docData.tag1 ?? null,
820-
tag2: processedTags.tag2 ?? docData.tag2 ?? null,
821-
tag3: processedTags.tag3 ?? docData.tag3 ?? null,
822-
tag4: processedTags.tag4 ?? docData.tag4 ?? null,
823-
tag5: processedTags.tag5 ?? docData.tag5 ?? null,
824-
tag6: processedTags.tag6 ?? docData.tag6 ?? null,
825-
tag7: processedTags.tag7 ?? docData.tag7 ?? null,
826-
number1: processedTags.number1 ?? null,
827-
number2: processedTags.number2 ?? null,
828-
number3: processedTags.number3 ?? null,
829-
number4: processedTags.number4 ?? null,
830-
number5: processedTags.number5 ?? null,
831-
date1: processedTags.date1 ?? null,
832-
date2: processedTags.date2 ?? null,
833-
boolean1: processedTags.boolean1 ?? null,
834-
boolean2: processedTags.boolean2 ?? null,
835-
boolean3: processedTags.boolean3 ?? null,
836-
}
837-
838-
documentRecords.push(newDocument)
839-
returnData.push({
840-
documentId,
841-
filename: docData.filename,
842-
fileUrl: docData.fileUrl,
843-
fileSize: docData.fileSize,
844-
mimeType: docData.mimeType,
845-
})
927+
const newDocument = {
928+
id: documentId,
929+
knowledgeBaseId,
930+
filename: docData.filename,
931+
fileUrl: docData.fileUrl,
932+
fileSize: docData.fileSize,
933+
mimeType: docData.mimeType,
934+
chunkCount: 0,
935+
tokenCount: 0,
936+
characterCount: 0,
937+
processingStatus: 'pending' as const,
938+
enabled: true,
939+
uploadedAt: now,
940+
tag1: processedTags.tag1 ?? docData.tag1 ?? null,
941+
tag2: processedTags.tag2 ?? docData.tag2 ?? null,
942+
tag3: processedTags.tag3 ?? docData.tag3 ?? null,
943+
tag4: processedTags.tag4 ?? docData.tag4 ?? null,
944+
tag5: processedTags.tag5 ?? docData.tag5 ?? null,
945+
tag6: processedTags.tag6 ?? docData.tag6 ?? null,
946+
tag7: processedTags.tag7 ?? docData.tag7 ?? null,
947+
number1: processedTags.number1 ?? null,
948+
number2: processedTags.number2 ?? null,
949+
number3: processedTags.number3 ?? null,
950+
number4: processedTags.number4 ?? null,
951+
number5: processedTags.number5 ?? null,
952+
date1: processedTags.date1 ?? null,
953+
date2: processedTags.date2 ?? null,
954+
boolean1: processedTags.boolean1 ?? null,
955+
boolean2: processedTags.boolean2 ?? null,
956+
boolean3: processedTags.boolean3 ?? null,
846957
}
847958

848-
if (documentRecords.length > 0) {
849-
await tx.insert(document).values(documentRecords)
850-
logger.info(
851-
`[${requestId}] Bulk created ${documentRecords.length} document records in knowledge base ${knowledgeBaseId}`
852-
)
959+
documentRecords.push(newDocument)
960+
returnData.push({
961+
documentId,
962+
filename: docData.filename,
963+
fileUrl: docData.fileUrl,
964+
fileSize: docData.fileSize,
965+
mimeType: docData.mimeType,
966+
})
967+
}
853968

854-
await tx
855-
.update(knowledgeBase)
856-
.set({ updatedAt: now })
857-
.where(eq(knowledgeBase.id, knowledgeBaseId))
969+
if (documentRecords.length > 0) {
970+
const insertedCount = await insertDocumentsIfKbAlive(documentRecords, knowledgeBaseId)
971+
if (insertedCount === 0) {
972+
throw new Error('Knowledge base not found')
858973
}
974+
logger.info(
975+
`[${requestId}] Bulk created ${insertedCount} document records in knowledge base ${knowledgeBaseId}`
976+
)
859977

860-
return returnData
861-
})
978+
await db
979+
.update(knowledgeBase)
980+
.set({ updatedAt: now })
981+
.where(eq(knowledgeBase.id, knowledgeBaseId))
982+
}
983+
984+
return returnData
862985
}
863986

864987
export interface TagFilterCondition {
@@ -1297,7 +1420,7 @@ export async function createSingleDocument(
12971420
}
12981421
}
12991422

1300-
const newDocument = {
1423+
const newDocument: NewDocumentRow = {
13011424
id: documentId,
13021425
knowledgeBaseId,
13031426
filename: documentData.filename,
@@ -1307,31 +1430,21 @@ export async function createSingleDocument(
13071430
chunkCount: 0,
13081431
tokenCount: 0,
13091432
characterCount: 0,
1433+
processingStatus: 'pending',
13101434
enabled: true,
13111435
uploadedAt: now,
13121436
...processedTags,
13131437
}
13141438

1315-
await db.transaction(async (tx) => {
1316-
await tx.execute(sql`SELECT 1 FROM knowledge_base WHERE id = ${knowledgeBaseId} FOR UPDATE`)
1317-
1318-
const kb = await tx
1319-
.select({ id: knowledgeBase.id })
1320-
.from(knowledgeBase)
1321-
.where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt)))
1322-
.limit(1)
1323-
1324-
if (kb.length === 0) {
1325-
throw new Error('Knowledge base not found')
1326-
}
1327-
1328-
await tx.insert(document).values(newDocument)
1439+
const insertedCount = await insertDocumentsIfKbAlive([newDocument], knowledgeBaseId)
1440+
if (insertedCount === 0) {
1441+
throw new Error('Knowledge base not found')
1442+
}
13291443

1330-
await tx
1331-
.update(knowledgeBase)
1332-
.set({ updatedAt: now })
1333-
.where(eq(knowledgeBase.id, knowledgeBaseId))
1334-
})
1444+
await db
1445+
.update(knowledgeBase)
1446+
.set({ updatedAt: now })
1447+
.where(eq(knowledgeBase.id, knowledgeBaseId))
13351448
logger.info(`[${requestId}] Document created: ${documentId} in knowledge base ${knowledgeBaseId}`)
13361449

13371450
return newDocument as {

apps/sim/lib/workspaces/lifecycle.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ describe('workspace lifecycle', () => {
5555
})
5656

5757
const tx = {
58+
execute: vi.fn().mockResolvedValue([]),
5859
select: vi.fn().mockReturnValue({
5960
from: vi.fn().mockReturnValue({
6061
where: vi.fn().mockResolvedValue([{ id: 'kb-1' }]),

apps/sim/lib/workspaces/lifecycle.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@ export async function archiveWorkspace(
4949
.where(eq(workflowMcpServer.workspaceId, workspaceId))
5050

5151
await db.transaction(async (tx) => {
52+
// Workspace archival is a rare admin/cleanup operation that touches every
53+
// child table; on large workspaces it can exceed the 30s session default.
54+
// Override per-tx with a generous ceiling — if it ever runs longer than
55+
// this something is genuinely wrong.
56+
await tx.execute(sql`SET LOCAL statement_timeout = '5min'`)
57+
await tx.execute(sql`SET LOCAL lock_timeout = '30s'`)
58+
5259
await tx
5360
.update(knowledgeBase)
5461
.set({

packages/db/db.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,14 @@ const postgresClient = postgres(connectionString, {
1313
connect_timeout: 30,
1414
max: 15,
1515
onnotice: () => {},
16+
// Server-side guards. lock_timeout cancels a query waiting on a row lock for
17+
// >5s (e.g. another tx holding `SELECT ... FOR UPDATE`). statement_timeout
18+
// cancels any query running >30s. Heavy paths that legitimately need longer
19+
// (table service bulk JSONB rewrites) override per-tx with `SET LOCAL`.
20+
connection: {
21+
lock_timeout: 5_000,
22+
statement_timeout: 30_000,
23+
},
1624
})
1725

1826
export const db = drizzle(postgresClient, { schema })

0 commit comments

Comments
 (0)