Skip to content

Commit db19975

Browse files
committed
fix(terminal): reconnect to running executions after page refresh
1 parent d5a756c commit db19975

File tree

9 files changed

+784
-80
lines changed

9 files changed

+784
-80
lines changed

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import {
1212
import { generateRequestId } from '@/lib/core/utils/request'
1313
import { SSE_HEADERS } from '@/lib/core/utils/sse'
1414
import { getBaseUrl } from '@/lib/core/utils/urls'
15-
import { markExecutionCancelled } from '@/lib/execution/cancellation'
15+
import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/event-buffer'
1616
import { processInputFileFields } from '@/lib/execution/files'
1717
import { preprocessExecution } from '@/lib/execution/preprocessing'
1818
import { LoggingSession } from '@/lib/logs/execution/logging-session'
@@ -700,15 +700,27 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
700700
const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.sync)
701701
let isStreamClosed = false
702702

703+
const eventWriter = createExecutionEventWriter(executionId)
704+
setExecutionMeta(executionId, {
705+
status: 'active',
706+
userId: actorUserId,
707+
workflowId,
708+
}).catch(() => {})
709+
703710
const stream = new ReadableStream<Uint8Array>({
704711
async start(controller) {
705-
const sendEvent = (event: ExecutionEvent) => {
706-
if (isStreamClosed) return
712+
let finalMetaStatus: 'complete' | 'error' | 'cancelled' | null = null
707713

708-
try {
709-
controller.enqueue(encodeSSEEvent(event))
710-
} catch {
711-
isStreamClosed = true
714+
const sendEvent = (event: ExecutionEvent) => {
715+
if (!isStreamClosed) {
716+
try {
717+
controller.enqueue(encodeSSEEvent(event))
718+
} catch {
719+
isStreamClosed = true
720+
}
721+
}
722+
if (event.type !== 'stream:chunk' && event.type !== 'stream:done') {
723+
eventWriter.write(event as unknown as Record<string, unknown>).catch(() => {})
712724
}
713725
}
714726

@@ -951,6 +963,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
951963
duration: result.metadata?.duration || 0,
952964
},
953965
})
966+
finalMetaStatus = 'error'
954967
} else {
955968
logger.info(`[${requestId}] Workflow execution was cancelled`)
956969

@@ -963,6 +976,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
963976
duration: result.metadata?.duration || 0,
964977
},
965978
})
979+
finalMetaStatus = 'cancelled'
966980
}
967981
return
968982
}
@@ -986,6 +1000,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
9861000
endTime: result.metadata?.endTime || new Date().toISOString(),
9871001
},
9881002
})
1003+
finalMetaStatus = 'complete'
9891004
} catch (error: unknown) {
9901005
const isTimeout = isTimeoutError(error) || timeoutController.isTimedOut()
9911006
const errorMessage = isTimeout
@@ -1017,7 +1032,12 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
10171032
duration: executionResult?.metadata?.duration || 0,
10181033
},
10191034
})
1035+
finalMetaStatus = 'error'
10201036
} finally {
1037+
await eventWriter.close()
1038+
if (finalMetaStatus) {
1039+
setExecutionMeta(executionId, { status: finalMetaStatus }).catch(() => {})
1040+
}
10211041
timeoutController.cleanup()
10221042
if (executionId) {
10231043
await cleanupExecutionBase64Cache(executionId)
@@ -1032,10 +1052,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
10321052
},
10331053
cancel() {
10341054
isStreamClosed = true
1035-
timeoutController.cleanup()
1036-
logger.info(`[${requestId}] Client aborted SSE stream, signalling cancellation`)
1037-
timeoutController.abort()
1038-
markExecutionCancelled(executionId).catch(() => {})
1055+
logger.info(`[${requestId}] Client disconnected from SSE stream`)
10391056
},
10401057
})
10411058

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
import { createLogger } from '@sim/logger'
2+
import { type NextRequest, NextResponse } from 'next/server'
3+
import { checkHybridAuth } from '@/lib/auth/hybrid'
4+
import { SSE_HEADERS } from '@/lib/core/utils/sse'
5+
import {
6+
type ExecutionStreamStatus,
7+
getExecutionMeta,
8+
readExecutionEvents,
9+
} from '@/lib/execution/event-buffer'
10+
import { formatSSEEvent } from '@/lib/workflows/executor/execution-events'
11+
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
12+
13+
const logger = createLogger('ExecutionStreamReconnectAPI')
14+
15+
const POLL_INTERVAL_MS = 500
16+
17+
function isTerminalStatus(status: ExecutionStreamStatus): boolean {
18+
return status === 'complete' || status === 'error' || status === 'cancelled'
19+
}
20+
21+
export const runtime = 'nodejs'
22+
export const dynamic = 'force-dynamic'
23+
24+
export async function GET(
25+
req: NextRequest,
26+
{ params }: { params: Promise<{ id: string; executionId: string }> }
27+
) {
28+
const { id: workflowId, executionId } = await params
29+
30+
try {
31+
const auth = await checkHybridAuth(req, { requireWorkflowId: false })
32+
if (!auth.success || !auth.userId) {
33+
return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 })
34+
}
35+
36+
const workflowAuthorization = await authorizeWorkflowByWorkspacePermission({
37+
workflowId,
38+
userId: auth.userId,
39+
action: 'read',
40+
})
41+
if (!workflowAuthorization.allowed) {
42+
return NextResponse.json(
43+
{ error: workflowAuthorization.message || 'Access denied' },
44+
{ status: workflowAuthorization.status }
45+
)
46+
}
47+
48+
const meta = await getExecutionMeta(executionId)
49+
if (!meta) {
50+
return NextResponse.json({ error: 'Execution buffer not found or expired' }, { status: 404 })
51+
}
52+
53+
if (meta.workflowId && meta.workflowId !== workflowId) {
54+
return NextResponse.json(
55+
{ error: 'Execution does not belong to this workflow' },
56+
{ status: 403 }
57+
)
58+
}
59+
60+
const fromParam = req.nextUrl.searchParams.get('from')
61+
const fromEventId = fromParam ? Number.parseInt(fromParam, 10) : 0
62+
63+
logger.info('Reconnection stream requested', {
64+
workflowId,
65+
executionId,
66+
fromEventId,
67+
metaStatus: meta.status,
68+
})
69+
70+
const encoder = new TextEncoder()
71+
72+
const stream = new ReadableStream<Uint8Array>({
73+
async start(controller) {
74+
let lastEventId = fromEventId
75+
let closed = false
76+
77+
const enqueue = (text: string) => {
78+
if (closed) return
79+
try {
80+
controller.enqueue(encoder.encode(text))
81+
} catch {
82+
closed = true
83+
}
84+
}
85+
86+
try {
87+
// Replay buffered events
88+
const events = await readExecutionEvents(executionId, lastEventId)
89+
for (const entry of events) {
90+
if (closed) return
91+
enqueue(formatSSEEvent(entry.event as any))
92+
lastEventId = entry.eventId
93+
}
94+
95+
// Check if execution is already done
96+
const currentMeta = await getExecutionMeta(executionId)
97+
if (!currentMeta || isTerminalStatus(currentMeta.status)) {
98+
enqueue('data: [DONE]\n\n')
99+
if (!closed) controller.close()
100+
return
101+
}
102+
103+
// Poll for new events until execution completes
104+
while (!closed) {
105+
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS))
106+
if (closed) return
107+
108+
const newEvents = await readExecutionEvents(executionId, lastEventId)
109+
for (const entry of newEvents) {
110+
if (closed) return
111+
enqueue(formatSSEEvent(entry.event as any))
112+
lastEventId = entry.eventId
113+
}
114+
115+
const polledMeta = await getExecutionMeta(executionId)
116+
if (!polledMeta || isTerminalStatus(polledMeta.status)) {
117+
// One final read to catch any events flushed alongside the meta update
118+
const finalEvents = await readExecutionEvents(executionId, lastEventId)
119+
for (const entry of finalEvents) {
120+
if (closed) return
121+
enqueue(formatSSEEvent(entry.event as any))
122+
lastEventId = entry.eventId
123+
}
124+
enqueue('data: [DONE]\n\n')
125+
if (!closed) controller.close()
126+
return
127+
}
128+
}
129+
} catch (error) {
130+
logger.error('Error in reconnection stream', {
131+
executionId,
132+
error: error instanceof Error ? error.message : String(error),
133+
})
134+
if (!closed) {
135+
try {
136+
controller.close()
137+
} catch {}
138+
}
139+
}
140+
},
141+
cancel() {
142+
logger.info('Client disconnected from reconnection stream', { executionId })
143+
},
144+
})
145+
146+
return new NextResponse(stream, {
147+
headers: {
148+
...SSE_HEADERS,
149+
'X-Execution-Id': executionId,
150+
},
151+
})
152+
} catch (error: any) {
153+
logger.error('Failed to start reconnection stream', {
154+
workflowId,
155+
executionId,
156+
error: error.message,
157+
})
158+
return NextResponse.json(
159+
{ error: error.message || 'Failed to start reconnection stream' },
160+
{ status: 500 }
161+
)
162+
}
163+
}

0 commit comments

Comments
 (0)