Skip to content

Commit 066850b

Browse files
committed
fix streaming path
1 parent c332efd commit 066850b

File tree

2 files changed

+38
-15
lines changed

2 files changed

+38
-15
lines changed

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -517,8 +517,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
517517
cachedWorkflowData?.blocks || {}
518518
)
519519
const streamVariables = cachedWorkflowData?.variables ?? (workflow as any).variables
520-
const streamingTimeout = preprocessResult.executionTimeout?.sync
521-
522520
const stream = await createStreamingResponse({
523521
requestId,
524522
workflow: {
@@ -536,7 +534,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
536534
workflowTriggerType: triggerType === 'chat' ? 'chat' : 'api',
537535
includeFileBase64,
538536
base64MaxBytes,
539-
abortSignal: streamingTimeout ? AbortSignal.timeout(streamingTimeout) : undefined,
537+
timeoutMs: preprocessResult.executionTimeout?.sync,
540538
},
541539
executionId,
542540
})

apps/sim/lib/workflows/streaming/streaming.ts

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { createLogger } from '@sim/logger'
2+
import { getTimeoutErrorMessage } from '@/lib/core/execution-limits'
23
import {
34
extractBlockIdFromOutputId,
45
extractPathFromOutputId,
@@ -32,7 +33,7 @@ export interface StreamingConfig {
3233
workflowTriggerType?: 'api' | 'chat'
3334
includeFileBase64?: boolean
3435
base64MaxBytes?: number
35-
abortSignal?: AbortSignal
36+
timeoutMs?: number
3637
}
3738

3839
export interface StreamingResponseOptions {
@@ -269,6 +270,18 @@ export async function createStreamingResponse(
269270
}
270271
}
271272

273+
const timeoutMs = streamConfig.timeoutMs
274+
const abortController = new AbortController()
275+
let isTimedOut = false
276+
let timeoutId: NodeJS.Timeout | undefined
277+
278+
if (timeoutMs) {
279+
timeoutId = setTimeout(() => {
280+
isTimedOut = true
281+
abortController.abort()
282+
}, timeoutMs)
283+
}
284+
272285
try {
273286
const result = await executeWorkflow(
274287
workflow,
@@ -285,7 +298,7 @@ export async function createStreamingResponse(
285298
skipLoggingComplete: true,
286299
includeFileBase64: streamConfig.includeFileBase64,
287300
base64MaxBytes: streamConfig.base64MaxBytes,
288-
abortSignal: streamConfig.abortSignal,
301+
abortSignal: abortController.signal,
289302
},
290303
executionId
291304
)
@@ -295,18 +308,28 @@ export async function createStreamingResponse(
295308
processStreamingBlockLogs(result.logs, state.streamedContent)
296309
}
297310

298-
await completeLoggingSession(result)
311+
if (result.status === 'cancelled' && isTimedOut && timeoutMs) {
312+
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutMs)
313+
logger.info(`[${requestId}] Streaming execution timed out`, { timeoutMs })
314+
if (result._streamingMetadata?.loggingSession) {
315+
await result._streamingMetadata.loggingSession.markAsFailed(timeoutErrorMessage)
316+
}
317+
controller.enqueue(encodeSSE({ event: 'error', error: timeoutErrorMessage }))
318+
} else {
319+
await completeLoggingSession(result)
320+
321+
const minimalResult = await buildMinimalResult(
322+
result,
323+
streamConfig.selectedOutputs,
324+
state.streamedContent,
325+
requestId,
326+
streamConfig.includeFileBase64 ?? true,
327+
streamConfig.base64MaxBytes
328+
)
299329

300-
const minimalResult = await buildMinimalResult(
301-
result,
302-
streamConfig.selectedOutputs,
303-
state.streamedContent,
304-
requestId,
305-
streamConfig.includeFileBase64 ?? true,
306-
streamConfig.base64MaxBytes
307-
)
330+
controller.enqueue(encodeSSE({ event: 'final', data: minimalResult }))
331+
}
308332

309-
controller.enqueue(encodeSSE({ event: 'final', data: minimalResult }))
310333
controller.enqueue(encodeSSE('[DONE]'))
311334

312335
if (executionId) {
@@ -325,6 +348,8 @@ export async function createStreamingResponse(
325348
}
326349

327350
controller.close()
351+
} finally {
352+
if (timeoutId) clearTimeout(timeoutId)
328353
}
329354
},
330355
})

0 commit comments

Comments
 (0)