Skip to content

Commit 85284eb

Browse files
waleedlatif1claudeSg312
committed
fix(terminal): reconnect to running executions after page refresh (#3200)
* fix(terminal): reconnect to running executions after page refresh * fix(terminal): use ExecutionEvent type instead of any in reconnection stream * fix(execution): type event buffer with ExecutionEvent instead of Record<string, unknown> Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(execution): validate fromEventId query param in reconnection endpoint Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Fix some bugs * fix(variables): fix tag dropdown and cursor alignment in variables block (#3199) * feat(confluence): added list space labels, delete label, delete page prop (#3201) * updated route * ack comments * fix(execution): reset execution state in reconnection cleanup to unblock re-entry Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(execution): restore running entries when reconnection is interrupted by navigation Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * done * remove cast in ioredis types * ack PR comments --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Siddharth Ganesan <siddharthganesan@gmail.com>
1 parent d068ed6 commit 85284eb

File tree

12 files changed

+945
-171
lines changed

12 files changed

+945
-171
lines changed

apps/sim/app/api/workflows/[id]/deployments/[version]/route.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ const patchBodySchema = z
2929
description: z
3030
.string()
3131
.trim()
32-
.max(500, 'Description must be 500 characters or less')
32+
.max(2000, 'Description must be 2000 characters or less')
3333
.nullable()
3434
.optional(),
3535
isActive: z.literal(true).optional(), // Set to true to activate this version

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import {
1212
import { generateRequestId } from '@/lib/core/utils/request'
1313
import { SSE_HEADERS } from '@/lib/core/utils/sse'
1414
import { getBaseUrl } from '@/lib/core/utils/urls'
15-
import { markExecutionCancelled } from '@/lib/execution/cancellation'
15+
import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/event-buffer'
1616
import { processInputFileFields } from '@/lib/execution/files'
1717
import { preprocessExecution } from '@/lib/execution/preprocessing'
1818
import { LoggingSession } from '@/lib/logs/execution/logging-session'
@@ -700,15 +700,27 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
700700
const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.sync)
701701
let isStreamClosed = false
702702

703+
const eventWriter = createExecutionEventWriter(executionId)
704+
setExecutionMeta(executionId, {
705+
status: 'active',
706+
userId: actorUserId,
707+
workflowId,
708+
}).catch(() => {})
709+
703710
const stream = new ReadableStream<Uint8Array>({
704711
async start(controller) {
705-
const sendEvent = (event: ExecutionEvent) => {
706-
if (isStreamClosed) return
712+
let finalMetaStatus: 'complete' | 'error' | 'cancelled' | null = null
707713

708-
try {
709-
controller.enqueue(encodeSSEEvent(event))
710-
} catch {
711-
isStreamClosed = true
714+
const sendEvent = (event: ExecutionEvent) => {
715+
if (!isStreamClosed) {
716+
try {
717+
controller.enqueue(encodeSSEEvent(event))
718+
} catch {
719+
isStreamClosed = true
720+
}
721+
}
722+
if (event.type !== 'stream:chunk' && event.type !== 'stream:done') {
723+
eventWriter.write(event).catch(() => {})
712724
}
713725
}
714726

@@ -829,14 +841,12 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
829841

830842
const reader = streamingExec.stream.getReader()
831843
const decoder = new TextDecoder()
832-
let chunkCount = 0
833844

834845
try {
835846
while (true) {
836847
const { done, value } = await reader.read()
837848
if (done) break
838849

839-
chunkCount++
840850
const chunk = decoder.decode(value, { stream: true })
841851
sendEvent({
842852
type: 'stream:chunk',
@@ -951,6 +961,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
951961
duration: result.metadata?.duration || 0,
952962
},
953963
})
964+
finalMetaStatus = 'error'
954965
} else {
955966
logger.info(`[${requestId}] Workflow execution was cancelled`)
956967

@@ -963,6 +974,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
963974
duration: result.metadata?.duration || 0,
964975
},
965976
})
977+
finalMetaStatus = 'cancelled'
966978
}
967979
return
968980
}
@@ -986,6 +998,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
986998
endTime: result.metadata?.endTime || new Date().toISOString(),
987999
},
9881000
})
1001+
finalMetaStatus = 'complete'
9891002
} catch (error: unknown) {
9901003
const isTimeout = isTimeoutError(error) || timeoutController.isTimedOut()
9911004
const errorMessage = isTimeout
@@ -1017,7 +1030,18 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
10171030
duration: executionResult?.metadata?.duration || 0,
10181031
},
10191032
})
1033+
finalMetaStatus = 'error'
10201034
} finally {
1035+
try {
1036+
await eventWriter.close()
1037+
} catch (closeError) {
1038+
logger.warn(`[${requestId}] Failed to close event writer`, {
1039+
error: closeError instanceof Error ? closeError.message : String(closeError),
1040+
})
1041+
}
1042+
if (finalMetaStatus) {
1043+
setExecutionMeta(executionId, { status: finalMetaStatus }).catch(() => {})
1044+
}
10211045
timeoutController.cleanup()
10221046
if (executionId) {
10231047
await cleanupExecutionBase64Cache(executionId)
@@ -1032,10 +1056,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
10321056
},
10331057
cancel() {
10341058
isStreamClosed = true
1035-
timeoutController.cleanup()
1036-
logger.info(`[${requestId}] Client aborted SSE stream, signalling cancellation`)
1037-
timeoutController.abort()
1038-
markExecutionCancelled(executionId).catch(() => {})
1059+
logger.info(`[${requestId}] Client disconnected from SSE stream`)
10391060
},
10401061
})
10411062

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
import { createLogger } from '@sim/logger'
2+
import { type NextRequest, NextResponse } from 'next/server'
3+
import { checkHybridAuth } from '@/lib/auth/hybrid'
4+
import { SSE_HEADERS } from '@/lib/core/utils/sse'
5+
import {
6+
type ExecutionStreamStatus,
7+
getExecutionMeta,
8+
readExecutionEvents,
9+
} from '@/lib/execution/event-buffer'
10+
import { formatSSEEvent } from '@/lib/workflows/executor/execution-events'
11+
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
12+
13+
const logger = createLogger('ExecutionStreamReconnectAPI')
14+
15+
const POLL_INTERVAL_MS = 500
16+
const MAX_POLL_DURATION_MS = 10 * 60 * 1000 // 10 minutes
17+
18+
function isTerminalStatus(status: ExecutionStreamStatus): boolean {
19+
return status === 'complete' || status === 'error' || status === 'cancelled'
20+
}
21+
22+
export const runtime = 'nodejs'
23+
export const dynamic = 'force-dynamic'
24+
25+
export async function GET(
26+
req: NextRequest,
27+
{ params }: { params: Promise<{ id: string; executionId: string }> }
28+
) {
29+
const { id: workflowId, executionId } = await params
30+
31+
try {
32+
const auth = await checkHybridAuth(req, { requireWorkflowId: false })
33+
if (!auth.success || !auth.userId) {
34+
return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 })
35+
}
36+
37+
const workflowAuthorization = await authorizeWorkflowByWorkspacePermission({
38+
workflowId,
39+
userId: auth.userId,
40+
action: 'read',
41+
})
42+
if (!workflowAuthorization.allowed) {
43+
return NextResponse.json(
44+
{ error: workflowAuthorization.message || 'Access denied' },
45+
{ status: workflowAuthorization.status }
46+
)
47+
}
48+
49+
const meta = await getExecutionMeta(executionId)
50+
if (!meta) {
51+
return NextResponse.json({ error: 'Execution buffer not found or expired' }, { status: 404 })
52+
}
53+
54+
if (meta.workflowId && meta.workflowId !== workflowId) {
55+
return NextResponse.json(
56+
{ error: 'Execution does not belong to this workflow' },
57+
{ status: 403 }
58+
)
59+
}
60+
61+
const fromParam = req.nextUrl.searchParams.get('from')
62+
const parsed = fromParam ? Number.parseInt(fromParam, 10) : 0
63+
const fromEventId = Number.isFinite(parsed) && parsed >= 0 ? parsed : 0
64+
65+
logger.info('Reconnection stream requested', {
66+
workflowId,
67+
executionId,
68+
fromEventId,
69+
metaStatus: meta.status,
70+
})
71+
72+
const encoder = new TextEncoder()
73+
74+
let closed = false
75+
76+
const stream = new ReadableStream<Uint8Array>({
77+
async start(controller) {
78+
let lastEventId = fromEventId
79+
const pollDeadline = Date.now() + MAX_POLL_DURATION_MS
80+
81+
const enqueue = (text: string) => {
82+
if (closed) return
83+
try {
84+
controller.enqueue(encoder.encode(text))
85+
} catch {
86+
closed = true
87+
}
88+
}
89+
90+
try {
91+
const events = await readExecutionEvents(executionId, lastEventId)
92+
for (const entry of events) {
93+
if (closed) return
94+
enqueue(formatSSEEvent(entry.event))
95+
lastEventId = entry.eventId
96+
}
97+
98+
const currentMeta = await getExecutionMeta(executionId)
99+
if (!currentMeta || isTerminalStatus(currentMeta.status)) {
100+
enqueue('data: [DONE]\n\n')
101+
if (!closed) controller.close()
102+
return
103+
}
104+
105+
while (!closed && Date.now() < pollDeadline) {
106+
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS))
107+
if (closed) return
108+
109+
const newEvents = await readExecutionEvents(executionId, lastEventId)
110+
for (const entry of newEvents) {
111+
if (closed) return
112+
enqueue(formatSSEEvent(entry.event))
113+
lastEventId = entry.eventId
114+
}
115+
116+
const polledMeta = await getExecutionMeta(executionId)
117+
if (!polledMeta || isTerminalStatus(polledMeta.status)) {
118+
const finalEvents = await readExecutionEvents(executionId, lastEventId)
119+
for (const entry of finalEvents) {
120+
if (closed) return
121+
enqueue(formatSSEEvent(entry.event))
122+
lastEventId = entry.eventId
123+
}
124+
enqueue('data: [DONE]\n\n')
125+
if (!closed) controller.close()
126+
return
127+
}
128+
}
129+
130+
if (!closed) {
131+
logger.warn('Reconnection stream poll deadline reached', { executionId })
132+
enqueue('data: [DONE]\n\n')
133+
controller.close()
134+
}
135+
} catch (error) {
136+
logger.error('Error in reconnection stream', {
137+
executionId,
138+
error: error instanceof Error ? error.message : String(error),
139+
})
140+
if (!closed) {
141+
try {
142+
controller.close()
143+
} catch {}
144+
}
145+
}
146+
},
147+
cancel() {
148+
closed = true
149+
logger.info('Client disconnected from reconnection stream', { executionId })
150+
},
151+
})
152+
153+
return new NextResponse(stream, {
154+
headers: {
155+
...SSE_HEADERS,
156+
'X-Execution-Id': executionId,
157+
},
158+
})
159+
} catch (error: any) {
160+
logger.error('Failed to start reconnection stream', {
161+
workflowId,
162+
executionId,
163+
error: error.message,
164+
})
165+
return NextResponse.json(
166+
{ error: error.message || 'Failed to start reconnection stream' },
167+
{ status: 500 }
168+
)
169+
}
170+
}

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/components/general/components/version-description-modal.tsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ export function VersionDescriptionModal({
113113
className='min-h-[120px] resize-none'
114114
value={description}
115115
onChange={(e) => setDescription(e.target.value)}
116-
maxLength={500}
116+
maxLength={2000}
117117
disabled={isGenerating}
118118
/>
119119
<div className='flex items-center justify-between'>
@@ -123,7 +123,7 @@ export function VersionDescriptionModal({
123123
</p>
124124
)}
125125
{!updateMutation.error && !generateMutation.error && <div />}
126-
<p className='text-[11px] text-[var(--text-tertiary)]'>{description.length}/500</p>
126+
<p className='text-[11px] text-[var(--text-tertiary)]'>{description.length}/2000</p>
127127
</div>
128128
</ModalBody>
129129
<ModalFooter>

0 commit comments

Comments
 (0)