Skip to content

Commit 1b3387e

Browse files
committed
feat(async-jobs): async execution with job queue backends
1 parent a627faa commit 1b3387e

File tree

19 files changed

+1117
-199
lines changed

19 files changed

+1117
-199
lines changed

apps/sim/app/api/cron/cleanup-stale-executions/route.ts

Lines changed: 67 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
import { db } from '@sim/db'
1+
import { asyncJobs, db } from '@sim/db'
22
import { workflowExecutionLogs } from '@sim/db/schema'
33
import { createLogger } from '@sim/logger'
4-
import { and, eq, lt, sql } from 'drizzle-orm'
4+
import { and, eq, inArray, lt, sql } from 'drizzle-orm'
55
import { type NextRequest, NextResponse } from 'next/server'
66
import { verifyCronAuth } from '@/lib/auth/internal'
7+
import { JOB_RETENTION_HOURS } from '@/lib/core/async-jobs'
78
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
89

910
const logger = createLogger('CleanupStaleExecutions')
@@ -80,12 +81,72 @@ export async function GET(request: NextRequest) {
8081

8182
logger.info(`Stale execution cleanup completed. Cleaned: ${cleaned}, Failed: ${failed}`)
8283

84+
// Clean up stale async jobs (stuck in processing)
85+
let asyncJobsMarkedFailed = 0
86+
87+
try {
88+
const staleAsyncJobs = await db
89+
.update(asyncJobs)
90+
.set({
91+
status: 'failed',
92+
completedAt: new Date(),
93+
error: `Job terminated: stuck in processing for more than ${STALE_THRESHOLD_MINUTES} minutes`,
94+
updatedAt: new Date(),
95+
})
96+
.where(and(eq(asyncJobs.status, 'processing'), lt(asyncJobs.startedAt, staleThreshold)))
97+
.returning({ id: asyncJobs.id })
98+
99+
asyncJobsMarkedFailed = staleAsyncJobs.length
100+
if (asyncJobsMarkedFailed > 0) {
101+
logger.info(`Marked ${asyncJobsMarkedFailed} stale async jobs as failed`)
102+
}
103+
} catch (error) {
104+
logger.error('Failed to clean up stale async jobs:', {
105+
error: error instanceof Error ? error.message : String(error),
106+
})
107+
}
108+
109+
// Delete completed/failed jobs older than retention period
110+
const retentionThreshold = new Date(Date.now() - JOB_RETENTION_HOURS * 60 * 60 * 1000)
111+
let asyncJobsDeleted = 0
112+
113+
try {
114+
const deletedJobs = await db
115+
.delete(asyncJobs)
116+
.where(
117+
and(
118+
inArray(asyncJobs.status, ['completed', 'failed']),
119+
lt(asyncJobs.completedAt, retentionThreshold)
120+
)
121+
)
122+
.returning({ id: asyncJobs.id })
123+
124+
asyncJobsDeleted = deletedJobs.length
125+
if (asyncJobsDeleted > 0) {
126+
logger.info(
127+
`Deleted ${asyncJobsDeleted} old async jobs (retention: ${JOB_RETENTION_HOURS}h)`
128+
)
129+
}
130+
} catch (error) {
131+
logger.error('Failed to delete old async jobs:', {
132+
error: error instanceof Error ? error.message : String(error),
133+
})
134+
}
135+
83136
return NextResponse.json({
84137
success: true,
85-
found: staleExecutions.length,
86-
cleaned,
87-
failed,
88-
thresholdMinutes: STALE_THRESHOLD_MINUTES,
138+
executions: {
139+
found: staleExecutions.length,
140+
cleaned,
141+
failed,
142+
thresholdMinutes: STALE_THRESHOLD_MINUTES,
143+
},
144+
asyncJobs: {
145+
staleMarkedFailed: asyncJobsMarkedFailed,
146+
oldDeleted: asyncJobsDeleted,
147+
staleThresholdMinutes: STALE_THRESHOLD_MINUTES,
148+
retentionHours: JOB_RETENTION_HOURS,
149+
},
89150
})
90151
} catch (error) {
91152
logger.error('Error in stale execution cleanup job:', error)

apps/sim/app/api/jobs/[jobId]/route.ts

Lines changed: 34 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { createLogger } from '@sim/logger'
2-
import { runs } from '@trigger.dev/sdk'
32
import { type NextRequest, NextResponse } from 'next/server'
43
import { checkHybridAuth } from '@/lib/auth/hybrid'
4+
import { getJobQueue } from '@/lib/core/async-jobs'
55
import { generateRequestId } from '@/lib/core/utils/request'
66
import { createErrorResponse } from '@/app/api/workflows/utils'
77

@@ -15,8 +15,6 @@ export async function GET(
1515
const requestId = generateRequestId()
1616

1717
try {
18-
logger.debug(`[${requestId}] Getting status for task: ${taskId}`)
19-
2018
const authResult = await checkHybridAuth(request, { requireWorkflowId: false })
2119
if (!authResult.success || !authResult.userId) {
2220
logger.warn(`[${requestId}] Unauthorized task status request`)
@@ -25,76 +23,60 @@ export async function GET(
2523

2624
const authenticatedUserId = authResult.userId
2725

28-
const run = await runs.retrieve(taskId)
26+
const jobQueue = await getJobQueue()
27+
const job = await jobQueue.getJob(taskId)
2928

30-
logger.debug(`[${requestId}] Task ${taskId} status: ${run.status}`)
29+
if (!job) {
30+
return createErrorResponse('Task not found', 404)
31+
}
3132

32-
const payload = run.payload as any
33-
if (payload?.workflowId) {
33+
if (job.metadata?.workflowId) {
3434
const { verifyWorkflowAccess } = await import('@/socket/middleware/permissions')
35-
const accessCheck = await verifyWorkflowAccess(authenticatedUserId, payload.workflowId)
35+
const accessCheck = await verifyWorkflowAccess(
36+
authenticatedUserId,
37+
job.metadata.workflowId as string
38+
)
3639
if (!accessCheck.hasAccess) {
37-
logger.warn(`[${requestId}] User ${authenticatedUserId} denied access to task ${taskId}`, {
38-
workflowId: payload.workflowId,
39-
})
40-
return createErrorResponse('Access denied', 403)
41-
}
42-
logger.debug(`[${requestId}] User ${authenticatedUserId} has access to task ${taskId}`)
43-
} else {
44-
if (payload?.userId && payload.userId !== authenticatedUserId) {
45-
logger.warn(
46-
`[${requestId}] User ${authenticatedUserId} attempted to access task ${taskId} owned by ${payload.userId}`
47-
)
48-
return createErrorResponse('Access denied', 403)
49-
}
50-
if (!payload?.userId) {
51-
logger.warn(
52-
`[${requestId}] Task ${taskId} has no ownership information in payload. Denying access for security.`
53-
)
40+
logger.warn(`[${requestId}] Access denied to workflow ${job.metadata.workflowId}`)
5441
return createErrorResponse('Access denied', 403)
5542
}
43+
} else if (job.metadata?.userId && job.metadata.userId !== authenticatedUserId) {
44+
logger.warn(`[${requestId}] Access denied to user ${job.metadata.userId}`)
45+
return createErrorResponse('Access denied', 403)
46+
} else if (!job.metadata?.userId && !job.metadata?.workflowId) {
47+
logger.warn(`[${requestId}] Access denied to job ${taskId}`)
48+
return createErrorResponse('Access denied', 403)
5649
}
5750

58-
const statusMap = {
59-
QUEUED: 'queued',
60-
WAITING_FOR_DEPLOY: 'queued',
61-
EXECUTING: 'processing',
62-
RESCHEDULED: 'processing',
63-
FROZEN: 'processing',
64-
COMPLETED: 'completed',
65-
CANCELED: 'cancelled',
66-
FAILED: 'failed',
67-
CRASHED: 'failed',
68-
INTERRUPTED: 'failed',
69-
SYSTEM_FAILURE: 'failed',
70-
EXPIRED: 'failed',
71-
} as const
72-
73-
const mappedStatus = statusMap[run.status as keyof typeof statusMap] || 'unknown'
51+
const mappedStatus = job.status === 'pending' ? 'queued' : job.status
7452

7553
const response: any = {
7654
success: true,
7755
taskId,
7856
status: mappedStatus,
7957
metadata: {
80-
startedAt: run.startedAt,
58+
startedAt: job.startedAt,
8159
},
8260
}
8361

84-
if (mappedStatus === 'completed') {
85-
response.output = run.output // This contains the workflow execution results
86-
response.metadata.completedAt = run.finishedAt
87-
response.metadata.duration = run.durationMs
62+
if (job.status === 'completed') {
63+
response.output = job.output
64+
response.metadata.completedAt = job.completedAt
65+
if (job.startedAt && job.completedAt) {
66+
response.metadata.duration = job.completedAt.getTime() - job.startedAt.getTime()
67+
}
8868
}
8969

90-
if (mappedStatus === 'failed') {
91-
response.error = run.error
92-
response.metadata.completedAt = run.finishedAt
93-
response.metadata.duration = run.durationMs
70+
if (job.status === 'failed') {
71+
response.error = job.error
72+
response.metadata.completedAt = job.completedAt
73+
if (job.startedAt && job.completedAt) {
74+
response.metadata.duration = job.completedAt.getTime() - job.startedAt.getTime()
75+
}
9476
}
9577

96-
if (mappedStatus === 'processing' || mappedStatus === 'queued') {
97-
response.estimatedDuration = 180000 // 3 minutes max from our config
78+
if (job.status === 'processing' || job.status === 'pending') {
79+
response.estimatedDuration = 180000
9880
}
9981

10082
return NextResponse.json(response)

apps/sim/app/api/schedules/execute/route.ts

Lines changed: 47 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
import { db, workflowDeploymentVersion, workflowSchedule } from '@sim/db'
22
import { createLogger } from '@sim/logger'
3-
import { tasks } from '@trigger.dev/sdk'
43
import { and, eq, isNull, lt, lte, not, or, sql } from 'drizzle-orm'
54
import { type NextRequest, NextResponse } from 'next/server'
65
import { verifyCronAuth } from '@/lib/auth/internal'
7-
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
6+
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
87
import { generateRequestId } from '@/lib/core/utils/request'
98
import { executeScheduleJob } from '@/background/schedule-execution'
109

@@ -55,72 +54,57 @@ export async function GET(request: NextRequest) {
5554
logger.debug(`[${requestId}] Successfully queried schedules: ${dueSchedules.length} found`)
5655
logger.info(`[${requestId}] Processing ${dueSchedules.length} due scheduled workflows`)
5756

58-
if (isTriggerDevEnabled) {
59-
const triggerPromises = dueSchedules.map(async (schedule) => {
60-
const queueTime = schedule.lastQueuedAt ?? queuedAt
61-
62-
try {
63-
const payload = {
64-
scheduleId: schedule.id,
65-
workflowId: schedule.workflowId,
66-
blockId: schedule.blockId || undefined,
67-
cronExpression: schedule.cronExpression || undefined,
68-
lastRanAt: schedule.lastRanAt?.toISOString(),
69-
failedCount: schedule.failedCount || 0,
70-
now: queueTime.toISOString(),
71-
scheduledFor: schedule.nextRunAt?.toISOString(),
72-
}
73-
74-
const handle = await tasks.trigger('schedule-execution', payload)
75-
logger.info(
76-
`[${requestId}] Queued schedule execution task ${handle.id} for workflow ${schedule.workflowId}`
77-
)
78-
return handle
79-
} catch (error) {
80-
logger.error(
81-
`[${requestId}] Failed to trigger schedule execution for workflow ${schedule.workflowId}`,
82-
error
83-
)
84-
return null
85-
}
86-
})
87-
88-
await Promise.allSettled(triggerPromises)
89-
90-
logger.info(`[${requestId}] Queued ${dueSchedules.length} schedule executions to Trigger.dev`)
91-
} else {
92-
const directExecutionPromises = dueSchedules.map(async (schedule) => {
93-
const queueTime = schedule.lastQueuedAt ?? queuedAt
94-
95-
const payload = {
96-
scheduleId: schedule.id,
97-
workflowId: schedule.workflowId,
98-
blockId: schedule.blockId || undefined,
99-
cronExpression: schedule.cronExpression || undefined,
100-
lastRanAt: schedule.lastRanAt?.toISOString(),
101-
failedCount: schedule.failedCount || 0,
102-
now: queueTime.toISOString(),
103-
scheduledFor: schedule.nextRunAt?.toISOString(),
104-
}
105-
106-
void executeScheduleJob(payload).catch((error) => {
107-
logger.error(
108-
`[${requestId}] Direct schedule execution failed for workflow ${schedule.workflowId}`,
109-
error
110-
)
57+
const jobQueue = await getJobQueue()
58+
59+
const queuePromises = dueSchedules.map(async (schedule) => {
60+
const queueTime = schedule.lastQueuedAt ?? queuedAt
61+
62+
const payload = {
63+
scheduleId: schedule.id,
64+
workflowId: schedule.workflowId,
65+
blockId: schedule.blockId || undefined,
66+
cronExpression: schedule.cronExpression || undefined,
67+
lastRanAt: schedule.lastRanAt?.toISOString(),
68+
failedCount: schedule.failedCount || 0,
69+
now: queueTime.toISOString(),
70+
scheduledFor: schedule.nextRunAt?.toISOString(),
71+
}
72+
73+
try {
74+
const jobId = await jobQueue.enqueue('schedule-execution', payload, {
75+
metadata: { workflowId: schedule.workflowId },
11176
})
112-
11377
logger.info(
114-
`[${requestId}] Queued direct schedule execution for workflow ${schedule.workflowId} (Trigger.dev disabled)`
78+
`[${requestId}] Queued schedule execution task ${jobId} for workflow ${schedule.workflowId}`
11579
)
116-
})
11780

118-
await Promise.allSettled(directExecutionPromises)
81+
if (shouldExecuteInline()) {
82+
void (async () => {
83+
try {
84+
await jobQueue.startJob(jobId)
85+
const output = await executeScheduleJob(payload)
86+
await jobQueue.completeJob(jobId, output)
87+
} catch (error) {
88+
const errorMessage = error instanceof Error ? error.message : String(error)
89+
logger.error(
90+
`[${requestId}] Schedule execution failed for workflow ${schedule.workflowId}`,
91+
{ jobId, error: errorMessage }
92+
)
93+
await jobQueue.markJobFailed(jobId, errorMessage)
94+
}
95+
})()
96+
}
97+
} catch (error) {
98+
logger.error(
99+
`[${requestId}] Failed to queue schedule execution for workflow ${schedule.workflowId}`,
100+
error
101+
)
102+
}
103+
})
104+
105+
await Promise.allSettled(queuePromises)
119106

120-
logger.info(
121-
`[${requestId}] Queued ${dueSchedules.length} direct schedule executions (Trigger.dev disabled)`
122-
)
123-
}
107+
logger.info(`[${requestId}] Queued ${dueSchedules.length} schedule executions`)
124108

125109
return NextResponse.json({
126110
message: 'Scheduled workflow executions processed',

0 commit comments

Comments
 (0)