Skip to content

Commit c332efd

Browse files
committed
support streaming and async paths'
1 parent d2e4afd commit c332efd

File tree

10 files changed

+153
-58
lines changed

10 files changed

+153
-58
lines changed

apps/docs/content/docs/en/execution/costs.mdx

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -217,16 +217,16 @@ Different subscription plans have different usage limits:
217217

218218
Workflows have maximum execution time limits based on your subscription plan:
219219

220-
| Plan | Sync Execution Limit |
221-
|------|---------------------|
222-
| **Free** | 5 minutes |
223-
| **Pro** | 60 minutes |
224-
| **Team** | 60 minutes |
225-
| **Enterprise** | 60 minutes |
220+
| Plan | Sync Execution | Async Execution |
221+
|------|----------------|-----------------|
222+
| **Free** | 5 minutes | 10 minutes |
223+
| **Pro** | 60 minutes | 90 minutes |
224+
| **Team** | 60 minutes | 90 minutes |
225+
| **Enterprise** | 60 minutes | 90 minutes |
226226

227227
**Sync executions** run immediately and return results directly. These are triggered via the API with `async: false` (default) or through the UI.
228228

229-
**Async executions** (triggered via API with `async: true`, webhooks, or schedules) run in the background with a 90-minute time limit for all plans.
229+
**Async executions** (triggered via API with `async: true`, webhooks, or schedules) run in the background. Async time limits are 2x the sync limit, capped at 90 minutes.
230230

231231
<Callout type="info">
232232
If a workflow exceeds its time limit, it will be terminated and marked as failed with a timeout error. Design long-running workflows to use async execution or break them into smaller workflows.

apps/sim/app/api/tools/video/route.ts

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { createLogger } from '@sim/logger'
22
import { type NextRequest, NextResponse } from 'next/server'
33
import { checkInternalAuth } from '@/lib/auth/hybrid'
4-
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
4+
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
55
import { downloadFileFromStorage } from '@/lib/uploads/utils/file-utils.server'
66
import type { UserFile } from '@/executor/types'
77
import type { VideoRequestBody } from '@/tools/video/types'
@@ -328,7 +328,7 @@ async function generateWithRunway(
328328
logger.info(`[${requestId}] Runway task created: ${taskId}`)
329329

330330
const pollIntervalMs = 5000
331-
const maxAttempts = Math.ceil(DEFAULT_EXECUTION_TIMEOUT_MS / pollIntervalMs)
331+
const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs)
332332
let attempts = 0
333333

334334
while (attempts < maxAttempts) {
@@ -372,7 +372,7 @@ async function generateWithRunway(
372372
attempts++
373373
}
374374

375-
throw new Error('Runway generation timed out after 10 minutes')
375+
throw new Error('Runway generation timed out')
376376
}
377377

378378
async function generateWithVeo(
@@ -432,7 +432,7 @@ async function generateWithVeo(
432432
logger.info(`[${requestId}] Veo operation created: ${operationName}`)
433433

434434
const pollIntervalMs = 5000
435-
const maxAttempts = Math.ceil(DEFAULT_EXECUTION_TIMEOUT_MS / pollIntervalMs)
435+
const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs)
436436
let attempts = 0
437437

438438
while (attempts < maxAttempts) {
@@ -488,7 +488,7 @@ async function generateWithVeo(
488488
attempts++
489489
}
490490

491-
throw new Error('Veo generation timed out after 5 minutes')
491+
throw new Error('Veo generation timed out')
492492
}
493493

494494
async function generateWithLuma(
@@ -545,7 +545,7 @@ async function generateWithLuma(
545545
logger.info(`[${requestId}] Luma generation created: ${generationId}`)
546546

547547
const pollIntervalMs = 5000
548-
const maxAttempts = Math.ceil(DEFAULT_EXECUTION_TIMEOUT_MS / pollIntervalMs)
548+
const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs)
549549
let attempts = 0
550550

551551
while (attempts < maxAttempts) {
@@ -596,7 +596,7 @@ async function generateWithLuma(
596596
attempts++
597597
}
598598

599-
throw new Error('Luma generation timed out after 10 minutes')
599+
throw new Error('Luma generation timed out')
600600
}
601601

602602
async function generateWithMiniMax(
@@ -663,7 +663,7 @@ async function generateWithMiniMax(
663663
logger.info(`[${requestId}] MiniMax task created: ${taskId}`)
664664

665665
const pollIntervalMs = 5000
666-
const maxAttempts = Math.ceil(DEFAULT_EXECUTION_TIMEOUT_MS / pollIntervalMs)
666+
const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs)
667667
let attempts = 0
668668

669669
while (attempts < maxAttempts) {
@@ -746,7 +746,7 @@ async function generateWithMiniMax(
746746
attempts++
747747
}
748748

749-
throw new Error('MiniMax generation timed out after 10 minutes')
749+
throw new Error('MiniMax generation timed out')
750750
}
751751

752752
// Helper function to strip subpaths from Fal.ai model IDs for status/result endpoints
@@ -865,7 +865,7 @@ async function generateWithFalAI(
865865
const baseModelId = getBaseModelId(falModelId)
866866

867867
const pollIntervalMs = 5000
868-
const maxAttempts = Math.ceil(DEFAULT_EXECUTION_TIMEOUT_MS / pollIntervalMs)
868+
const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs)
869869
let attempts = 0
870870

871871
while (attempts < maxAttempts) {
@@ -942,7 +942,7 @@ async function generateWithFalAI(
942942
attempts++
943943
}
944944

945-
throw new Error('Fal.ai generation timed out after 8 minutes')
945+
throw new Error('Fal.ai generation timed out')
946946
}
947947

948948
function getVideoDimensions(

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
517517
cachedWorkflowData?.blocks || {}
518518
)
519519
const streamVariables = cachedWorkflowData?.variables ?? (workflow as any).variables
520+
const streamingTimeout = preprocessResult.executionTimeout?.sync
520521

521522
const stream = await createStreamingResponse({
522523
requestId,
@@ -535,6 +536,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
535536
workflowTriggerType: triggerType === 'chat' ? 'chat' : 'api',
536537
includeFileBase64,
537538
base64MaxBytes,
539+
abortSignal: streamingTimeout ? AbortSignal.timeout(streamingTimeout) : undefined,
538540
},
539541
executionId,
540542
})

apps/sim/background/schedule-execution.ts

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { task } from '@trigger.dev/sdk'
44
import { Cron } from 'croner'
55
import { eq } from 'drizzle-orm'
66
import { v4 as uuidv4 } from 'uuid'
7+
import { getTimeoutErrorMessage } from '@/lib/core/execution-limits'
78
import { preprocessExecution } from '@/lib/execution/preprocessing'
89
import { LoggingSession } from '@/lib/logs/execution/logging-session'
910
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
@@ -120,13 +121,15 @@ async function runWorkflowExecution({
120121
loggingSession,
121122
requestId,
122123
executionId,
124+
asyncTimeout,
123125
}: {
124126
payload: ScheduleExecutionPayload
125127
workflowRecord: WorkflowRecord
126128
actorUserId: string
127129
loggingSession: LoggingSession
128130
requestId: string
129131
executionId: string
132+
asyncTimeout?: number
130133
}): Promise<RunWorkflowResult> {
131134
try {
132135
logger.debug(`[${requestId}] Loading deployed workflow ${payload.workflowId}`)
@@ -181,15 +184,38 @@ async function runWorkflowExecution({
181184
[]
182185
)
183186

184-
const executionResult = await executeWorkflowCore({
185-
snapshot,
186-
callbacks: {},
187-
loggingSession,
188-
includeFileBase64: true,
189-
base64MaxBytes: undefined,
190-
})
187+
const abortController = new AbortController()
188+
let isTimedOut = false
189+
let timeoutId: NodeJS.Timeout | undefined
190+
191+
if (asyncTimeout) {
192+
timeoutId = setTimeout(() => {
193+
isTimedOut = true
194+
abortController.abort()
195+
}, asyncTimeout)
196+
}
197+
198+
let executionResult
199+
try {
200+
executionResult = await executeWorkflowCore({
201+
snapshot,
202+
callbacks: {},
203+
loggingSession,
204+
includeFileBase64: true,
205+
base64MaxBytes: undefined,
206+
abortSignal: abortController.signal,
207+
})
208+
} finally {
209+
if (timeoutId) clearTimeout(timeoutId)
210+
}
191211

192-
if (executionResult.status === 'paused') {
212+
if (executionResult.status === 'cancelled' && isTimedOut && asyncTimeout) {
213+
const timeoutErrorMessage = getTimeoutErrorMessage(null, asyncTimeout)
214+
logger.info(`[${requestId}] Scheduled workflow execution timed out`, {
215+
timeoutMs: asyncTimeout,
216+
})
217+
await loggingSession.markAsFailed(timeoutErrorMessage)
218+
} else if (executionResult.status === 'paused') {
193219
if (!executionResult.snapshotSeed) {
194220
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
195221
executionId,
@@ -453,6 +479,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
453479
loggingSession,
454480
requestId,
455481
executionId,
482+
asyncTimeout: preprocessResult.executionTimeout?.async,
456483
})
457484

458485
if (executionResult.status === 'skip') {

apps/sim/background/webhook-execution.ts

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ import { createLogger } from '@sim/logger'
44
import { task } from '@trigger.dev/sdk'
55
import { eq } from 'drizzle-orm'
66
import { v4 as uuidv4 } from 'uuid'
7+
import { getHighestPrioritySubscription } from '@/lib/billing'
8+
import { getExecutionTimeout, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
79
import { IdempotencyService, webhookIdempotency } from '@/lib/core/idempotency'
10+
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
811
import { processExecutionFiles } from '@/lib/execution/files'
912
import { LoggingSession } from '@/lib/logs/execution/logging-session'
1013
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
@@ -134,7 +137,22 @@ async function executeWebhookJobInternal(
134137
requestId
135138
)
136139

137-
// Track deploymentVersionId at function scope so it's available in catch block
140+
const userSubscription = await getHighestPrioritySubscription(payload.userId)
141+
const asyncTimeout = getExecutionTimeout(
142+
userSubscription?.plan as SubscriptionPlan | undefined,
143+
'async'
144+
)
145+
const abortController = new AbortController()
146+
let isTimedOut = false
147+
let timeoutId: NodeJS.Timeout | undefined
148+
149+
if (asyncTimeout) {
150+
timeoutId = setTimeout(() => {
151+
isTimedOut = true
152+
abortController.abort()
153+
}, asyncTimeout)
154+
}
155+
138156
let deploymentVersionId: string | undefined
139157

140158
try {
@@ -241,11 +259,18 @@ async function executeWebhookJobInternal(
241259
snapshot,
242260
callbacks: {},
243261
loggingSession,
244-
includeFileBase64: true, // Enable base64 hydration
245-
base64MaxBytes: undefined, // Use default limit
262+
includeFileBase64: true,
263+
base64MaxBytes: undefined,
264+
abortSignal: abortController.signal,
246265
})
247266

248-
if (executionResult.status === 'paused') {
267+
if (executionResult.status === 'cancelled' && isTimedOut && asyncTimeout) {
268+
const timeoutErrorMessage = getTimeoutErrorMessage(null, asyncTimeout)
269+
logger.info(`[${requestId}] Airtable webhook execution timed out`, {
270+
timeoutMs: asyncTimeout,
271+
})
272+
await loggingSession.markAsFailed(timeoutErrorMessage)
273+
} else if (executionResult.status === 'paused') {
249274
if (!executionResult.snapshotSeed) {
250275
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
251276
executionId,
@@ -497,9 +522,14 @@ async function executeWebhookJobInternal(
497522
callbacks: {},
498523
loggingSession,
499524
includeFileBase64: true,
525+
abortSignal: abortController.signal,
500526
})
501527

502-
if (executionResult.status === 'paused') {
528+
if (executionResult.status === 'cancelled' && isTimedOut && asyncTimeout) {
529+
const timeoutErrorMessage = getTimeoutErrorMessage(null, asyncTimeout)
530+
logger.info(`[${requestId}] Webhook execution timed out`, { timeoutMs: asyncTimeout })
531+
await loggingSession.markAsFailed(timeoutErrorMessage)
532+
} else if (executionResult.status === 'paused') {
503533
if (!executionResult.snapshotSeed) {
504534
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
505535
executionId,
@@ -601,6 +631,8 @@ async function executeWebhookJobInternal(
601631
}
602632

603633
throw error
634+
} finally {
635+
if (timeoutId) clearTimeout(timeoutId)
604636
}
605637
}
606638

apps/sim/background/workflow-execution.ts

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { createLogger } from '@sim/logger'
22
import { task } from '@trigger.dev/sdk'
33
import { v4 as uuidv4 } from 'uuid'
4+
import { getTimeoutErrorMessage } from '@/lib/core/execution-limits'
45
import { preprocessExecution } from '@/lib/execution/preprocessing'
56
import { LoggingSession } from '@/lib/logs/execution/logging-session'
67
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
@@ -103,15 +104,37 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
103104
[]
104105
)
105106

106-
const result = await executeWorkflowCore({
107-
snapshot,
108-
callbacks: {},
109-
loggingSession,
110-
includeFileBase64: true,
111-
base64MaxBytes: undefined,
112-
})
107+
const asyncTimeout = preprocessResult.executionTimeout?.async
108+
const abortController = new AbortController()
109+
let isTimedOut = false
110+
let timeoutId: NodeJS.Timeout | undefined
111+
112+
if (asyncTimeout) {
113+
timeoutId = setTimeout(() => {
114+
isTimedOut = true
115+
abortController.abort()
116+
}, asyncTimeout)
117+
}
118+
119+
let result
120+
try {
121+
result = await executeWorkflowCore({
122+
snapshot,
123+
callbacks: {},
124+
loggingSession,
125+
includeFileBase64: true,
126+
base64MaxBytes: undefined,
127+
abortSignal: abortController.signal,
128+
})
129+
} finally {
130+
if (timeoutId) clearTimeout(timeoutId)
131+
}
113132

114-
if (result.status === 'paused') {
133+
if (result.status === 'cancelled' && isTimedOut && asyncTimeout) {
134+
const timeoutErrorMessage = getTimeoutErrorMessage(null, asyncTimeout)
135+
logger.info(`[${requestId}] Workflow execution timed out`, { timeoutMs: asyncTimeout })
136+
await loggingSession.markAsFailed(timeoutErrorMessage)
137+
} else if (result.status === 'paused') {
115138
if (!result.snapshotSeed) {
116139
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
117140
executionId,

0 commit comments

Comments
 (0)